Coverage for Lib/asyncio/locks.py: 99%
272 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
1"""Synchronization primitives."""
3__all__ = ('Lock', 'Event', 'Condition', 'Semaphore',
4 'BoundedSemaphore', 'Barrier')
6import collections
7import enum
9from . import exceptions
10from . import mixins
12class _ContextManagerMixin:
13 async def __aenter__(self):
14 await self.acquire()
15 # We have no use for the "as ..." clause in the with
16 # statement for locks.
17 return None
19 async def __aexit__(self, exc_type, exc, tb):
20 self.release()
23class Lock(_ContextManagerMixin, mixins._LoopBoundMixin):
24 """Primitive lock objects.
26 A primitive lock is a synchronization primitive that is not owned
27 by a particular task when locked. A primitive lock is in one
28 of two states, 'locked' or 'unlocked'.
30 It is created in the unlocked state. It has two basic methods,
31 acquire() and release(). When the state is unlocked, acquire()
32 changes the state to locked and returns immediately. When the
33 state is locked, acquire() blocks until a call to release() in
34 another task changes it to unlocked, then the acquire() call
35 resets it to locked and returns. The release() method should only
36 be called in the locked state; it changes the state to unlocked
37 and returns immediately. If an attempt is made to release an
38 unlocked lock, a RuntimeError will be raised.
40 When more than one task is blocked in acquire() waiting for
41 the state to turn to unlocked, only one task proceeds when a
42 release() call resets the state to unlocked; successive release()
43 calls will unblock tasks in FIFO order.
45 Locks also support the asynchronous context management protocol.
46 'async with lock' statement should be used.
48 Usage:
50 lock = Lock()
51 ...
52 await lock.acquire()
53 try:
54 ...
55 finally:
56 lock.release()
58 Context manager usage:
60 lock = Lock()
61 ...
62 async with lock:
63 ...
65 Lock objects can be tested for locking state:
67 if not lock.locked():
68 await lock.acquire()
69 else:
70 # lock is acquired
71 ...
73 """
75 def __init__(self):
76 self._waiters = None
77 self._locked = False
79 def __repr__(self):
80 res = super().__repr__()
81 extra = 'locked' if self._locked else 'unlocked'
82 if self._waiters: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true
83 extra = f'{extra}, waiters:{len(self._waiters)}'
84 return f'<{res[1:-1]} [{extra}]>'
86 def locked(self):
87 """Return True if lock is acquired."""
88 return self._locked
90 async def acquire(self):
91 """Acquire a lock.
93 This method blocks until the lock is unlocked, then sets it to
94 locked and returns True.
95 """
96 # Implement fair scheduling, where thread always waits
97 # its turn. Jumping the queue if all are cancelled is an optimization.
98 if (not self._locked and (self._waiters is None or
99 all(w.cancelled() for w in self._waiters))):
100 self._locked = True
101 return True
103 if self._waiters is None:
104 self._waiters = collections.deque()
105 fut = self._get_loop().create_future()
106 self._waiters.append(fut)
108 try:
109 try:
110 await fut
111 finally:
112 self._waiters.remove(fut)
113 except exceptions.CancelledError:
114 # Currently the only exception designed be able to occur here.
116 # Ensure the lock invariant: If lock is not claimed (or about
117 # to be claimed by us) and there is a Task in waiters,
118 # ensure that the Task at the head will run.
119 if not self._locked:
120 self._wake_up_first()
121 raise
123 # assert self._locked is False
124 self._locked = True
125 return True
127 def release(self):
128 """Release a lock.
130 When the lock is locked, reset it to unlocked, and return.
131 If any other tasks are blocked waiting for the lock to become
132 unlocked, allow exactly one of them to proceed.
134 When invoked on an unlocked lock, a RuntimeError is raised.
136 There is no return value.
137 """
138 if self._locked:
139 self._locked = False
140 self._wake_up_first()
141 else:
142 raise RuntimeError('Lock is not acquired.')
144 def _wake_up_first(self):
145 """Ensure that the first waiter will wake up."""
146 if not self._waiters:
147 return
148 try:
149 fut = next(iter(self._waiters))
150 except StopIteration:
151 return
153 # .done() means that the waiter is already set to wake up.
154 if not fut.done():
155 fut.set_result(True)
158class Event(mixins._LoopBoundMixin):
159 """Asynchronous equivalent to threading.Event.
161 Class implementing event objects. An event manages a flag that can be set
162 to true with the set() method and reset to false with the clear() method.
163 The wait() method blocks until the flag is true. The flag is initially
164 false.
165 """
167 def __init__(self):
168 self._waiters = collections.deque()
169 self._value = False
171 def __repr__(self):
172 res = super().__repr__()
173 extra = 'set' if self._value else 'unset'
174 if self._waiters:
175 extra = f'{extra}, waiters:{len(self._waiters)}'
176 return f'<{res[1:-1]} [{extra}]>'
178 def is_set(self):
179 """Return True if and only if the internal flag is true."""
180 return self._value
182 def set(self):
183 """Set the internal flag to true. All tasks waiting for it to
184 become true are awakened. Tasks that call wait() once the flag is
185 true will not block at all.
186 """
187 if not self._value:
188 self._value = True
190 for fut in self._waiters:
191 if not fut.done():
192 fut.set_result(True)
194 def clear(self):
195 """Reset the internal flag to false. Subsequently, tasks calling
196 wait() will block until set() is called to set the internal flag
197 to true again."""
198 self._value = False
200 async def wait(self):
201 """Block until the internal flag is true.
203 If the internal flag is true on entry, return True
204 immediately. Otherwise, block until another task calls
205 set() to set the flag to true, then return True.
206 """
207 if self._value:
208 return True
210 fut = self._get_loop().create_future()
211 self._waiters.append(fut)
212 try:
213 await fut
214 return True
215 finally:
216 self._waiters.remove(fut)
219class Condition(_ContextManagerMixin, mixins._LoopBoundMixin):
220 """Asynchronous equivalent to threading.Condition.
222 This class implements condition variable objects. A condition variable
223 allows one or more tasks to wait until they are notified by another
224 task.
226 A new Lock object is created and used as the underlying lock.
227 """
229 def __init__(self, lock=None):
230 if lock is None:
231 lock = Lock()
233 self._lock = lock
234 # Export the lock's locked(), acquire() and release() methods.
235 self.locked = lock.locked
236 self.acquire = lock.acquire
237 self.release = lock.release
239 self._waiters = collections.deque()
241 def __repr__(self):
242 res = super().__repr__()
243 extra = 'locked' if self.locked() else 'unlocked'
244 if self._waiters:
245 extra = f'{extra}, waiters:{len(self._waiters)}'
246 return f'<{res[1:-1]} [{extra}]>'
248 async def wait(self):
249 """Wait until notified.
251 If the calling task has not acquired the lock when this
252 method is called, a RuntimeError is raised.
254 This method releases the underlying lock, and then blocks
255 until it is awakened by a notify() or notify_all() call for
256 the same condition variable in another task. Once
257 awakened, it re-acquires the lock and returns True.
259 This method may return spuriously,
260 which is why the caller should always
261 re-check the state and be prepared to wait() again.
262 """
263 if not self.locked():
264 raise RuntimeError('cannot wait on un-acquired lock')
266 fut = self._get_loop().create_future()
267 self.release()
268 try:
269 try:
270 self._waiters.append(fut)
271 try:
272 await fut
273 return True
274 finally:
275 self._waiters.remove(fut)
277 finally:
278 # Must re-acquire lock even if wait is cancelled.
279 # We only catch CancelledError here, since we don't want any
280 # other (fatal) errors with the future to cause us to spin.
281 err = None
282 while True:
283 try:
284 await self.acquire()
285 break
286 except exceptions.CancelledError as e:
287 err = e
289 if err is not None:
290 try:
291 raise err # Re-raise most recent exception instance.
292 finally:
293 err = None # Break reference cycles.
294 except BaseException:
295 # Any error raised out of here _may_ have occurred after this Task
296 # believed to have been successfully notified.
297 # Make sure to notify another Task instead. This may result
298 # in a "spurious wakeup", which is allowed as part of the
299 # Condition Variable protocol.
300 self._notify(1)
301 raise
303 async def wait_for(self, predicate):
304 """Wait until a predicate becomes true.
306 The predicate should be a callable whose result will be
307 interpreted as a boolean value. The method will repeatedly
308 wait() until it evaluates to true. The final predicate value is
309 the return value.
310 """
311 result = predicate()
312 while not result:
313 await self.wait()
314 result = predicate()
315 return result
317 def notify(self, n=1):
318 """By default, wake up one task waiting on this condition, if any.
319 If the calling task has not acquired the lock when this method
320 is called, a RuntimeError is raised.
322 This method wakes up n of the tasks waiting for the condition
323 variable; if fewer than n are waiting, they are all awoken.
325 Note: an awakened task does not actually return from its
326 wait() call until it can reacquire the lock. Since notify() does
327 not release the lock, its caller should.
328 """
329 if not self.locked():
330 raise RuntimeError('cannot notify on un-acquired lock')
331 self._notify(n)
333 def _notify(self, n):
334 idx = 0
335 for fut in self._waiters:
336 if idx >= n:
337 break
339 if not fut.done():
340 idx += 1
341 fut.set_result(False)
343 def notify_all(self):
344 """Wake up all tasks waiting on this condition. This method acts
345 like notify(), but wakes up all waiting tasks instead of one. If the
346 calling task has not acquired the lock when this method is called,
347 a RuntimeError is raised.
348 """
349 self.notify(len(self._waiters))
352class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
353 """A Semaphore implementation.
355 A semaphore manages an internal counter which is decremented by each
356 acquire() call and incremented by each release() call. The counter
357 can never go below zero; when acquire() finds that it is zero, it blocks,
358 waiting until some other thread calls release().
360 Semaphores also support the context management protocol.
362 The optional argument gives the initial value for the internal
363 counter; it defaults to 1. If the value given is less than 0,
364 ValueError is raised.
365 """
367 def __init__(self, value=1):
368 if value < 0:
369 raise ValueError("Semaphore initial value must be >= 0")
370 self._waiters = None
371 self._value = value
373 def __repr__(self):
374 res = super().__repr__()
375 extra = 'locked' if self.locked() else f'unlocked, value:{self._value}'
376 if self._waiters:
377 extra = f'{extra}, waiters:{len(self._waiters)}'
378 return f'<{res[1:-1]} [{extra}]>'
380 def locked(self):
381 """Returns True if semaphore cannot be acquired immediately."""
382 # Due to state, or FIFO rules (must allow others to run first).
383 return self._value == 0 or (
384 any(not w.cancelled() for w in (self._waiters or ())))
386 async def acquire(self):
387 """Acquire a semaphore.
389 If the internal counter is larger than zero on entry,
390 decrement it by one and return True immediately. If it is
391 zero on entry, block, waiting until some other task has
392 called release() to make it larger than 0, and then return
393 True.
394 """
395 if not self.locked():
396 # Maintain FIFO, wait for others to start even if _value > 0.
397 self._value -= 1
398 return True
400 if self._waiters is None:
401 self._waiters = collections.deque()
402 fut = self._get_loop().create_future()
403 self._waiters.append(fut)
405 try:
406 try:
407 await fut
408 finally:
409 self._waiters.remove(fut)
410 except exceptions.CancelledError:
411 # Currently the only exception designed be able to occur here.
412 if fut.done() and not fut.cancelled():
413 # Our Future was successfully set to True via _wake_up_next(),
414 # but we are not about to successfully acquire(). Therefore we
415 # must undo the bookkeeping already done and attempt to wake
416 # up someone else.
417 self._value += 1
418 raise
420 finally:
421 # New waiters may have arrived but had to wait due to FIFO.
422 # Wake up as many as are allowed.
423 while self._value > 0:
424 if not self._wake_up_next():
425 break # There was no-one to wake up.
426 return True
428 def release(self):
429 """Release a semaphore, incrementing the internal counter by one.
431 When it was zero on entry and another task is waiting for it to
432 become larger than zero again, wake up that task.
433 """
434 self._value += 1
435 self._wake_up_next()
437 def _wake_up_next(self):
438 """Wake up the first waiter that isn't done."""
439 if not self._waiters:
440 return False
442 for fut in self._waiters:
443 if not fut.done():
444 self._value -= 1
445 fut.set_result(True)
446 # `fut` is now `done()` and not `cancelled()`.
447 return True
448 return False
451class BoundedSemaphore(Semaphore):
452 """A bounded semaphore implementation.
454 This raises ValueError in release() if it would increase the value
455 above the initial value.
456 """
458 def __init__(self, value=1):
459 self._bound_value = value
460 super().__init__(value)
462 def release(self):
463 if self._value >= self._bound_value:
464 raise ValueError('BoundedSemaphore released too many times')
465 super().release()
469class _BarrierState(enum.Enum):
470 FILLING = 'filling'
471 DRAINING = 'draining'
472 RESETTING = 'resetting'
473 BROKEN = 'broken'
476class Barrier(mixins._LoopBoundMixin):
477 """Asyncio equivalent to threading.Barrier
479 Implements a Barrier primitive.
480 Useful for synchronizing a fixed number of tasks at known synchronization
481 points. Tasks block on 'wait()' and are simultaneously awoken once they
482 have all made their call.
483 """
485 def __init__(self, parties):
486 """Create a barrier, initialised to 'parties' tasks."""
487 if parties < 1:
488 raise ValueError('parties must be >= 1')
490 self._cond = Condition() # notify all tasks when state changes
492 self._parties = parties
493 self._state = _BarrierState.FILLING
494 self._count = 0 # count tasks in Barrier
496 def __repr__(self):
497 res = super().__repr__()
498 extra = f'{self._state.value}'
499 if not self.broken:
500 extra += f', waiters:{self.n_waiting}/{self.parties}'
501 return f'<{res[1:-1]} [{extra}]>'
503 async def __aenter__(self):
504 # wait for the barrier reaches the parties number
505 # when start draining release and return index of waited task
506 return await self.wait()
508 async def __aexit__(self, *args):
509 pass
511 async def wait(self):
512 """Wait for the barrier.
514 When the specified number of tasks have started waiting, they are all
515 simultaneously awoken.
516 Returns an unique and individual index number from 0 to 'parties-1'.
517 """
518 async with self._cond:
519 await self._block() # Block while the barrier drains or resets.
520 try:
521 index = self._count
522 self._count += 1
523 if index + 1 == self._parties:
524 # We release the barrier
525 await self._release()
526 else:
527 await self._wait()
528 return index
529 finally:
530 self._count -= 1
531 # Wake up any tasks waiting for barrier to drain.
532 self._exit()
534 async def _block(self):
535 # Block until the barrier is ready for us,
536 # or raise an exception if it is broken.
537 #
538 # It is draining or resetting, wait until done
539 # unless a CancelledError occurs
540 await self._cond.wait_for(
541 lambda: self._state not in (
542 _BarrierState.DRAINING, _BarrierState.RESETTING
543 )
544 )
546 # see if the barrier is in a broken state
547 if self._state is _BarrierState.BROKEN:
548 raise exceptions.BrokenBarrierError("Barrier aborted")
550 async def _release(self):
551 # Release the tasks waiting in the barrier.
553 # Enter draining state.
554 # Next waiting tasks will be blocked until the end of draining.
555 self._state = _BarrierState.DRAINING
556 self._cond.notify_all()
558 async def _wait(self):
559 # Wait in the barrier until we are released. Raise an exception
560 # if the barrier is reset or broken.
562 # wait for end of filling
563 # unless a CancelledError occurs
564 await self._cond.wait_for(lambda: self._state is not _BarrierState.FILLING)
566 if self._state in (_BarrierState.BROKEN, _BarrierState.RESETTING):
567 raise exceptions.BrokenBarrierError("Abort or reset of barrier")
569 def _exit(self):
570 # If we are the last tasks to exit the barrier, signal any tasks
571 # waiting for the barrier to drain.
572 if self._count == 0:
573 if self._state in (_BarrierState.RESETTING, _BarrierState.DRAINING):
574 self._state = _BarrierState.FILLING
575 self._cond.notify_all()
577 async def reset(self):
578 """Reset the barrier to the initial state.
580 Any tasks currently waiting will get the BrokenBarrier exception
581 raised.
582 """
583 async with self._cond:
584 if self._count > 0:
585 if self._state is not _BarrierState.RESETTING: 585 ↛ 590line 585 didn't jump to line 590 because the condition on line 585 was always true
586 #reset the barrier, waking up tasks
587 self._state = _BarrierState.RESETTING
588 else:
589 self._state = _BarrierState.FILLING
590 self._cond.notify_all()
592 async def abort(self):
593 """Place the barrier into a 'broken' state.
595 Useful in case of error. Any currently waiting tasks and tasks
596 attempting to 'wait()' will have BrokenBarrierError raised.
597 """
598 async with self._cond:
599 self._state = _BarrierState.BROKEN
600 self._cond.notify_all()
602 @property
603 def parties(self):
604 """Return the number of tasks required to trip the barrier."""
605 return self._parties
607 @property
608 def n_waiting(self):
609 """Return the number of tasks currently waiting at the barrier."""
610 if self._state is _BarrierState.FILLING:
611 return self._count
612 return 0
614 @property
615 def broken(self):
616 """Return True if the barrier is in a broken state."""
617 return self._state is _BarrierState.BROKEN