Coverage for Lib/asyncio/locks.py: 99%
269 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:28 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:28 +0000
1"""Synchronization primitives."""
3__all__ = ('Lock', 'Event', 'Condition', 'Semaphore',
4 'BoundedSemaphore', 'Barrier')
6import collections
7import enum
9from . import exceptions
10from . import mixins
12class _ContextManagerMixin:
13 async def __aenter__(self):
14 await self.acquire()
15 # We have no use for the "as ..." clause in the with
16 # statement for locks.
17 return None
19 async def __aexit__(self, exc_type, exc, tb):
20 self.release()
23class Lock(_ContextManagerMixin, mixins._LoopBoundMixin):
24 """Primitive lock objects.
26 A primitive lock is a synchronization primitive that is not owned
27 by a particular task when locked. A primitive lock is in one
28 of two states, 'locked' or 'unlocked'.
30 It is created in the unlocked state. It has two basic methods,
31 acquire() and release(). When the state is unlocked, acquire()
32 changes the state to locked and returns immediately. When the
33 state is locked, acquire() blocks until a call to release() in
34 another task changes it to unlocked, then the acquire() call
35 resets it to locked and returns. The release() method should only
36 be called in the locked state; it changes the state to unlocked
37 and returns immediately. If an attempt is made to release an
38 unlocked lock, a RuntimeError will be raised.
40 When more than one task is blocked in acquire() waiting for
41 the state to turn to unlocked, only one task proceeds when a
42 release() call resets the state to unlocked; successive release()
43 calls will unblock tasks in FIFO order.
45 Locks also support the asynchronous context management protocol.
46 'async with lock' statement should be used.
48 Usage:
50 lock = Lock()
51 ...
52 await lock.acquire()
53 try:
54 ...
55 finally:
56 lock.release()
58 Context manager usage:
60 lock = Lock()
61 ...
62 async with lock:
63 ...
65 Lock objects can be tested for locking state:
67 if not lock.locked():
68 await lock.acquire()
69 else:
70 # lock is acquired
71 ...
73 """
75 def __init__(self):
76 self._waiters = None
77 self._locked = False
79 def __repr__(self):
80 res = super().__repr__()
81 extra = 'locked' if self._locked else 'unlocked'
82 if self._waiters: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true
83 extra = f'{extra}, waiters:{len(self._waiters)}'
84 return f'<{res[1:-1]} [{extra}]>'
86 def locked(self):
87 """Return True if lock is acquired."""
88 return self._locked
90 async def acquire(self):
91 """Acquire a lock.
93 This method blocks until the lock is unlocked, then sets it to
94 locked and returns True.
95 """
96 # Implement fair scheduling, where thread always waits
97 # its turn. Jumping the queue if all are cancelled is an optimization.
98 if (not self._locked and (self._waiters is None or
99 all(w.cancelled() for w in self._waiters))):
100 self._locked = True
101 return True
103 if self._waiters is None:
104 self._waiters = collections.deque()
105 fut = self._get_loop().create_future()
106 self._waiters.append(fut)
108 try:
109 try:
110 await fut
111 finally:
112 self._waiters.remove(fut)
113 except exceptions.CancelledError:
114 # Currently the only exception designed be able to occur here.
116 # Ensure the lock invariant: If lock is not claimed (or about
117 # to be claimed by us) and there is a Task in waiters,
118 # ensure that the Task at the head will run.
119 if not self._locked:
120 self._wake_up_first()
121 raise
123 # assert self._locked is False
124 self._locked = True
125 return True
127 def release(self):
128 """Release a lock.
130 When the lock is locked, reset it to unlocked, and return.
131 If any other tasks are blocked waiting for the lock to become
132 unlocked, allow exactly one of them to proceed.
134 When invoked on an unlocked lock, a RuntimeError is raised.
136 There is no return value.
137 """
138 if self._locked:
139 self._locked = False
140 self._wake_up_first()
141 else:
142 raise RuntimeError('Lock is not acquired.')
144 def _wake_up_first(self):
145 """Ensure that the first waiter will wake up."""
146 if not self._waiters:
147 return
148 fut = next(iter(self._waiters))
150 # .done() means that the waiter is already set to wake up.
151 if not fut.done():
152 fut.set_result(True)
155class Event(mixins._LoopBoundMixin):
156 """Asynchronous equivalent to threading.Event.
158 Class implementing event objects. An event manages a flag that can be
159 set to true with the set() method and reset to false with the clear()
160 method. The wait() method blocks until the flag is true. The flag is
161 initially false.
162 """
164 def __init__(self):
165 self._waiters = collections.deque()
166 self._value = False
168 def __repr__(self):
169 res = super().__repr__()
170 extra = 'set' if self._value else 'unset'
171 if self._waiters:
172 extra = f'{extra}, waiters:{len(self._waiters)}'
173 return f'<{res[1:-1]} [{extra}]>'
175 def is_set(self):
176 """Return True if and only if the internal flag is true."""
177 return self._value
179 def set(self):
180 """Set the internal flag to true. All tasks waiting for it to
181 become true are awakened. Tasks that call wait() once the flag is
182 true will not block at all.
183 """
184 if not self._value:
185 self._value = True
187 for fut in self._waiters:
188 if not fut.done():
189 fut.set_result(True)
191 def clear(self):
192 """Reset the internal flag to false. Subsequently, tasks calling
193 wait() will block until set() is called to set the internal flag
194 to true again."""
195 self._value = False
197 async def wait(self):
198 """Block until the internal flag is true.
200 If the internal flag is true on entry, return True
201 immediately. Otherwise, block until another task calls
202 set() to set the flag to true, then return True.
203 """
204 if self._value:
205 return True
207 fut = self._get_loop().create_future()
208 self._waiters.append(fut)
209 try:
210 await fut
211 return True
212 finally:
213 self._waiters.remove(fut)
216class Condition(_ContextManagerMixin, mixins._LoopBoundMixin):
217 """Asynchronous equivalent to threading.Condition.
219 This class implements condition variable objects. A condition variable
220 allows one or more tasks to wait until they are notified by another
221 task.
223 A new Lock object is created and used as the underlying lock.
224 """
226 def __init__(self, lock=None):
227 if lock is None:
228 lock = Lock()
230 self._lock = lock
231 # Export the lock's locked(), acquire() and release() methods.
232 self.locked = lock.locked
233 self.acquire = lock.acquire
234 self.release = lock.release
236 self._waiters = collections.deque()
238 def __repr__(self):
239 res = super().__repr__()
240 extra = 'locked' if self.locked() else 'unlocked'
241 if self._waiters:
242 extra = f'{extra}, waiters:{len(self._waiters)}'
243 return f'<{res[1:-1]} [{extra}]>'
245 async def wait(self):
246 """Wait until notified.
248 If the calling task has not acquired the lock when this
249 method is called, a RuntimeError is raised.
251 This method releases the underlying lock, and then blocks
252 until it is awakened by a notify() or notify_all() call for
253 the same condition variable in another task. Once
254 awakened, it re-acquires the lock and returns True.
256 This method may return spuriously,
257 which is why the caller should always
258 re-check the state and be prepared to wait() again.
259 """
260 if not self.locked():
261 raise RuntimeError('cannot wait on un-acquired lock')
263 fut = self._get_loop().create_future()
264 self.release()
265 try:
266 try:
267 self._waiters.append(fut)
268 try:
269 await fut
270 return True
271 finally:
272 self._waiters.remove(fut)
274 finally:
275 # Must re-acquire lock even if wait is cancelled.
276 # We only catch CancelledError here, since we don't want any
277 # other (fatal) errors with the future to cause us to spin.
278 err = None
279 while True:
280 try:
281 await self.acquire()
282 break
283 except exceptions.CancelledError as e:
284 err = e
286 if err is not None:
287 try:
288 raise err # Re-raise most recent exception instance.
289 finally:
290 err = None # Break reference cycles.
291 except BaseException:
292 # Any error raised out of here _may_ have occurred after this Task
293 # believed to have been successfully notified.
294 # Make sure to notify another Task instead. This may result
295 # in a "spurious wakeup", which is allowed as part of the
296 # Condition Variable protocol.
297 self._notify(1)
298 raise
300 async def wait_for(self, predicate):
301 """Wait until a predicate becomes true.
303 The predicate should be a callable whose result will be
304 interpreted as a boolean value. The method will repeatedly
305 wait() until it evaluates to true. The final predicate value is
306 the return value.
307 """
308 result = predicate()
309 while not result:
310 await self.wait()
311 result = predicate()
312 return result
314 def notify(self, n=1):
315 """By default, wake up one task waiting on this condition, if any.
316 If the calling task has not acquired the lock when this method
317 is called, a RuntimeError is raised.
319 This method wakes up n of the tasks waiting for the condition
320 variable; if fewer than n are waiting, they are all awoken.
322 Note: an awakened task does not actually return from its
323 wait() call until it can reacquire the lock. Since notify() does
324 not release the lock, its caller should.
325 """
326 if not self.locked():
327 raise RuntimeError('cannot notify on un-acquired lock')
328 self._notify(n)
330 def _notify(self, n):
331 idx = 0
332 for fut in self._waiters:
333 if idx >= n:
334 break
336 if not fut.done():
337 idx += 1
338 fut.set_result(False)
340 def notify_all(self):
341 """Wake up all tasks waiting on this condition. This method acts
342 like notify(), but wakes up all waiting tasks instead of one. If the
343 calling task has not acquired the lock when this method is called,
344 a RuntimeError is raised.
345 """
346 self.notify(len(self._waiters))
349class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
350 """A Semaphore implementation.
352 A semaphore manages an internal counter which is decremented by each
353 acquire() call and incremented by each release() call. The counter
354 can never go below zero; when acquire() finds that it is zero, it
355 blocks, waiting until some other thread calls release().
357 Semaphores also support the context management protocol.
359 The optional argument gives the initial value for the internal
360 counter; it defaults to 1. If the value given is less than 0,
361 ValueError is raised.
362 """
364 def __init__(self, value=1):
365 if value < 0:
366 raise ValueError("Semaphore initial value must be >= 0")
367 self._waiters = None
368 self._value = value
370 def __repr__(self):
371 res = super().__repr__()
372 extra = 'locked' if self.locked() else f'unlocked, value:{self._value}'
373 if self._waiters:
374 extra = f'{extra}, waiters:{len(self._waiters)}'
375 return f'<{res[1:-1]} [{extra}]>'
377 def locked(self):
378 """Returns True if semaphore cannot be acquired immediately."""
379 # Due to state, or FIFO rules (must allow others to run first).
380 return self._value == 0 or (
381 any(not w.cancelled() for w in (self._waiters or ())))
383 async def acquire(self):
384 """Acquire a semaphore.
386 If the internal counter is larger than zero on entry,
387 decrement it by one and return True immediately. If it is
388 zero on entry, block, waiting until some other task has
389 called release() to make it larger than 0, and then return
390 True.
391 """
392 if not self.locked():
393 # Maintain FIFO, wait for others to start even if _value > 0.
394 self._value -= 1
395 return True
397 if self._waiters is None:
398 self._waiters = collections.deque()
399 fut = self._get_loop().create_future()
400 self._waiters.append(fut)
402 try:
403 try:
404 await fut
405 finally:
406 self._waiters.remove(fut)
407 except exceptions.CancelledError:
408 # Currently the only exception designed be able to occur here.
409 if fut.done() and not fut.cancelled():
410 # Our Future was successfully set to True via _wake_up_next(),
411 # but we are not about to successfully acquire(). Therefore we
412 # must undo the bookkeeping already done and attempt to wake
413 # up someone else.
414 self._value += 1
415 raise
417 finally:
418 # New waiters may have arrived but had to wait due to FIFO.
419 # Wake up as many as are allowed.
420 while self._value > 0:
421 if not self._wake_up_next():
422 break # There was no-one to wake up.
423 return True
425 def release(self):
426 """Release a semaphore, incrementing the internal counter by one.
428 When it was zero on entry and another task is waiting for it to
429 become larger than zero again, wake up that task.
430 """
431 self._value += 1
432 self._wake_up_next()
434 def _wake_up_next(self):
435 """Wake up the first waiter that isn't done."""
436 if not self._waiters:
437 return False
439 for fut in self._waiters:
440 if not fut.done():
441 self._value -= 1
442 fut.set_result(True)
443 # `fut` is now `done()` and not `cancelled()`.
444 return True
445 return False
448class BoundedSemaphore(Semaphore):
449 """A bounded semaphore implementation.
451 This raises ValueError in release() if it would increase the value
452 above the initial value.
453 """
455 def __init__(self, value=1):
456 self._bound_value = value
457 super().__init__(value)
459 def release(self):
460 if self._value >= self._bound_value:
461 raise ValueError('BoundedSemaphore released too many times')
462 super().release()
466class _BarrierState(enum.Enum):
467 FILLING = 'filling'
468 DRAINING = 'draining'
469 RESETTING = 'resetting'
470 BROKEN = 'broken'
473class Barrier(mixins._LoopBoundMixin):
474 """Asyncio equivalent to threading.Barrier
476 Implements a Barrier primitive.
477 Useful for synchronizing a fixed number of tasks at known synchronization
478 points. Tasks block on 'wait()' and are simultaneously awoken once they
479 have all made their call.
480 """
482 def __init__(self, parties):
483 """Create a barrier, initialised to 'parties' tasks."""
484 if parties < 1:
485 raise ValueError('parties must be >= 1')
487 self._cond = Condition() # notify all tasks when state changes
489 self._parties = parties
490 self._state = _BarrierState.FILLING
491 self._count = 0 # count tasks in Barrier
493 def __repr__(self):
494 res = super().__repr__()
495 extra = f'{self._state.value}'
496 if not self.broken:
497 extra += f', waiters:{self.n_waiting}/{self.parties}'
498 return f'<{res[1:-1]} [{extra}]>'
500 async def __aenter__(self):
501 # wait for the barrier reaches the parties number
502 # when start draining release and return index of waited task
503 return await self.wait()
505 async def __aexit__(self, *args):
506 pass
508 async def wait(self):
509 """Wait for the barrier.
511 When the specified number of tasks have started waiting, they are
512 all simultaneously awoken.
513 Returns an unique and individual index number from 0 to 'parties-1'.
514 """
515 async with self._cond:
516 await self._block() # Block while the barrier drains or resets.
517 try:
518 index = self._count
519 self._count += 1
520 if index + 1 == self._parties:
521 # We release the barrier
522 await self._release()
523 else:
524 await self._wait()
525 return index
526 finally:
527 self._count -= 1
528 # Wake up any tasks waiting for barrier to drain.
529 self._exit()
531 async def _block(self):
532 # Block until the barrier is ready for us,
533 # or raise an exception if it is broken.
534 #
535 # It is draining or resetting, wait until done
536 # unless a CancelledError occurs
537 await self._cond.wait_for(
538 lambda: self._state not in (
539 _BarrierState.DRAINING, _BarrierState.RESETTING
540 )
541 )
543 # see if the barrier is in a broken state
544 if self._state is _BarrierState.BROKEN:
545 raise exceptions.BrokenBarrierError("Barrier aborted")
547 async def _release(self):
548 # Release the tasks waiting in the barrier.
550 # Enter draining state.
551 # Next waiting tasks will be blocked until the end of draining.
552 self._state = _BarrierState.DRAINING
553 self._cond.notify_all()
555 async def _wait(self):
556 # Wait in the barrier until we are released. Raise an exception
557 # if the barrier is reset or broken.
559 # wait for end of filling
560 # unless a CancelledError occurs
561 await self._cond.wait_for(lambda: self._state is not _BarrierState.FILLING)
563 if self._state in (_BarrierState.BROKEN, _BarrierState.RESETTING):
564 raise exceptions.BrokenBarrierError("Abort or reset of barrier")
566 def _exit(self):
567 # If we are the last tasks to exit the barrier, signal any tasks
568 # waiting for the barrier to drain.
569 if self._count == 0:
570 if self._state in (_BarrierState.RESETTING, _BarrierState.DRAINING):
571 self._state = _BarrierState.FILLING
572 self._cond.notify_all()
574 async def reset(self):
575 """Reset the barrier to the initial state.
577 Any tasks currently waiting will get the BrokenBarrier exception
578 raised.
579 """
580 async with self._cond:
581 if self._count > 0:
582 if self._state is not _BarrierState.RESETTING: 582 ↛ 587line 582 didn't jump to line 587 because the condition on line 582 was always true
583 #reset the barrier, waking up tasks
584 self._state = _BarrierState.RESETTING
585 else:
586 self._state = _BarrierState.FILLING
587 self._cond.notify_all()
589 async def abort(self):
590 """Place the barrier into a 'broken' state.
592 Useful in case of error. Any currently waiting tasks and tasks
593 attempting to 'wait()' will have BrokenBarrierError raised.
594 """
595 async with self._cond:
596 self._state = _BarrierState.BROKEN
597 self._cond.notify_all()
599 @property
600 def parties(self):
601 """Return the number of tasks required to trip the barrier."""
602 return self._parties
604 @property
605 def n_waiting(self):
606 """Return the number of tasks currently waiting at the barrier."""
607 if self._state is _BarrierState.FILLING:
608 return self._count
609 return 0
611 @property
612 def broken(self):
613 """Return True if the barrier is in a broken state."""
614 return self._state is _BarrierState.BROKEN