Coverage for Lib/asyncio/locks.py: 99%

269 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-24 03:28 +0000

1"""Synchronization primitives.""" 

2 

3__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 

4 'BoundedSemaphore', 'Barrier') 

5 

6import collections 

7import enum 

8 

9from . import exceptions 

10from . import mixins 

11 

12class _ContextManagerMixin: 

13 async def __aenter__(self): 

14 await self.acquire() 

15 # We have no use for the "as ..." clause in the with 

16 # statement for locks. 

17 return None 

18 

19 async def __aexit__(self, exc_type, exc, tb): 

20 self.release() 

21 

22 

23class Lock(_ContextManagerMixin, mixins._LoopBoundMixin): 

24 """Primitive lock objects. 

25 

26 A primitive lock is a synchronization primitive that is not owned 

27 by a particular task when locked. A primitive lock is in one 

28 of two states, 'locked' or 'unlocked'. 

29 

30 It is created in the unlocked state. It has two basic methods, 

31 acquire() and release(). When the state is unlocked, acquire() 

32 changes the state to locked and returns immediately. When the 

33 state is locked, acquire() blocks until a call to release() in 

34 another task changes it to unlocked, then the acquire() call 

35 resets it to locked and returns. The release() method should only 

36 be called in the locked state; it changes the state to unlocked 

37 and returns immediately. If an attempt is made to release an 

38 unlocked lock, a RuntimeError will be raised. 

39 

40 When more than one task is blocked in acquire() waiting for 

41 the state to turn to unlocked, only one task proceeds when a 

42 release() call resets the state to unlocked; successive release() 

43 calls will unblock tasks in FIFO order. 

44 

45 Locks also support the asynchronous context management protocol. 

46 'async with lock' statement should be used. 

47 

48 Usage: 

49 

50 lock = Lock() 

51 ... 

52 await lock.acquire() 

53 try: 

54 ... 

55 finally: 

56 lock.release() 

57 

58 Context manager usage: 

59 

60 lock = Lock() 

61 ... 

62 async with lock: 

63 ... 

64 

65 Lock objects can be tested for locking state: 

66 

67 if not lock.locked(): 

68 await lock.acquire() 

69 else: 

70 # lock is acquired 

71 ... 

72 

73 """ 

74 

75 def __init__(self): 

76 self._waiters = None 

77 self._locked = False 

78 

79 def __repr__(self): 

80 res = super().__repr__() 

81 extra = 'locked' if self._locked else 'unlocked' 

82 if self._waiters: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true

83 extra = f'{extra}, waiters:{len(self._waiters)}' 

84 return f'<{res[1:-1]} [{extra}]>' 

85 

86 def locked(self): 

87 """Return True if lock is acquired.""" 

88 return self._locked 

89 

90 async def acquire(self): 

91 """Acquire a lock. 

92 

93 This method blocks until the lock is unlocked, then sets it to 

94 locked and returns True. 

95 """ 

96 # Implement fair scheduling, where thread always waits 

97 # its turn. Jumping the queue if all are cancelled is an optimization. 

98 if (not self._locked and (self._waiters is None or 

99 all(w.cancelled() for w in self._waiters))): 

100 self._locked = True 

101 return True 

102 

103 if self._waiters is None: 

104 self._waiters = collections.deque() 

105 fut = self._get_loop().create_future() 

106 self._waiters.append(fut) 

107 

108 try: 

109 try: 

110 await fut 

111 finally: 

112 self._waiters.remove(fut) 

113 except exceptions.CancelledError: 

114 # Currently the only exception designed be able to occur here. 

115 

116 # Ensure the lock invariant: If lock is not claimed (or about 

117 # to be claimed by us) and there is a Task in waiters, 

118 # ensure that the Task at the head will run. 

119 if not self._locked: 

120 self._wake_up_first() 

121 raise 

122 

123 # assert self._locked is False 

124 self._locked = True 

125 return True 

126 

127 def release(self): 

128 """Release a lock. 

129 

130 When the lock is locked, reset it to unlocked, and return. 

131 If any other tasks are blocked waiting for the lock to become 

132 unlocked, allow exactly one of them to proceed. 

133 

134 When invoked on an unlocked lock, a RuntimeError is raised. 

135 

136 There is no return value. 

137 """ 

138 if self._locked: 

139 self._locked = False 

140 self._wake_up_first() 

141 else: 

142 raise RuntimeError('Lock is not acquired.') 

143 

144 def _wake_up_first(self): 

145 """Ensure that the first waiter will wake up.""" 

146 if not self._waiters: 

147 return 

148 fut = next(iter(self._waiters)) 

149 

150 # .done() means that the waiter is already set to wake up. 

151 if not fut.done(): 

152 fut.set_result(True) 

153 

154 

155class Event(mixins._LoopBoundMixin): 

156 """Asynchronous equivalent to threading.Event. 

157 

158 Class implementing event objects. An event manages a flag that can be 

159 set to true with the set() method and reset to false with the clear() 

160 method. The wait() method blocks until the flag is true. The flag is 

161 initially false. 

162 """ 

163 

164 def __init__(self): 

165 self._waiters = collections.deque() 

166 self._value = False 

167 

168 def __repr__(self): 

169 res = super().__repr__() 

170 extra = 'set' if self._value else 'unset' 

171 if self._waiters: 

172 extra = f'{extra}, waiters:{len(self._waiters)}' 

173 return f'<{res[1:-1]} [{extra}]>' 

174 

175 def is_set(self): 

176 """Return True if and only if the internal flag is true.""" 

177 return self._value 

178 

179 def set(self): 

180 """Set the internal flag to true. All tasks waiting for it to 

181 become true are awakened. Tasks that call wait() once the flag is 

182 true will not block at all. 

183 """ 

184 if not self._value: 

185 self._value = True 

186 

187 for fut in self._waiters: 

188 if not fut.done(): 

189 fut.set_result(True) 

190 

191 def clear(self): 

192 """Reset the internal flag to false. Subsequently, tasks calling 

193 wait() will block until set() is called to set the internal flag 

194 to true again.""" 

195 self._value = False 

196 

197 async def wait(self): 

198 """Block until the internal flag is true. 

199 

200 If the internal flag is true on entry, return True 

201 immediately. Otherwise, block until another task calls 

202 set() to set the flag to true, then return True. 

203 """ 

204 if self._value: 

205 return True 

206 

207 fut = self._get_loop().create_future() 

208 self._waiters.append(fut) 

209 try: 

210 await fut 

211 return True 

212 finally: 

213 self._waiters.remove(fut) 

214 

215 

216class Condition(_ContextManagerMixin, mixins._LoopBoundMixin): 

217 """Asynchronous equivalent to threading.Condition. 

218 

219 This class implements condition variable objects. A condition variable 

220 allows one or more tasks to wait until they are notified by another 

221 task. 

222 

223 A new Lock object is created and used as the underlying lock. 

224 """ 

225 

226 def __init__(self, lock=None): 

227 if lock is None: 

228 lock = Lock() 

229 

230 self._lock = lock 

231 # Export the lock's locked(), acquire() and release() methods. 

232 self.locked = lock.locked 

233 self.acquire = lock.acquire 

234 self.release = lock.release 

235 

236 self._waiters = collections.deque() 

237 

238 def __repr__(self): 

239 res = super().__repr__() 

240 extra = 'locked' if self.locked() else 'unlocked' 

241 if self._waiters: 

242 extra = f'{extra}, waiters:{len(self._waiters)}' 

243 return f'<{res[1:-1]} [{extra}]>' 

244 

245 async def wait(self): 

246 """Wait until notified. 

247 

248 If the calling task has not acquired the lock when this 

249 method is called, a RuntimeError is raised. 

250 

251 This method releases the underlying lock, and then blocks 

252 until it is awakened by a notify() or notify_all() call for 

253 the same condition variable in another task. Once 

254 awakened, it re-acquires the lock and returns True. 

255 

256 This method may return spuriously, 

257 which is why the caller should always 

258 re-check the state and be prepared to wait() again. 

259 """ 

260 if not self.locked(): 

261 raise RuntimeError('cannot wait on un-acquired lock') 

262 

263 fut = self._get_loop().create_future() 

264 self.release() 

265 try: 

266 try: 

267 self._waiters.append(fut) 

268 try: 

269 await fut 

270 return True 

271 finally: 

272 self._waiters.remove(fut) 

273 

274 finally: 

275 # Must re-acquire lock even if wait is cancelled. 

276 # We only catch CancelledError here, since we don't want any 

277 # other (fatal) errors with the future to cause us to spin. 

278 err = None 

279 while True: 

280 try: 

281 await self.acquire() 

282 break 

283 except exceptions.CancelledError as e: 

284 err = e 

285 

286 if err is not None: 

287 try: 

288 raise err # Re-raise most recent exception instance. 

289 finally: 

290 err = None # Break reference cycles. 

291 except BaseException: 

292 # Any error raised out of here _may_ have occurred after this Task 

293 # believed to have been successfully notified. 

294 # Make sure to notify another Task instead. This may result 

295 # in a "spurious wakeup", which is allowed as part of the 

296 # Condition Variable protocol. 

297 self._notify(1) 

298 raise 

299 

300 async def wait_for(self, predicate): 

301 """Wait until a predicate becomes true. 

302 

303 The predicate should be a callable whose result will be 

304 interpreted as a boolean value. The method will repeatedly 

305 wait() until it evaluates to true. The final predicate value is 

306 the return value. 

307 """ 

308 result = predicate() 

309 while not result: 

310 await self.wait() 

311 result = predicate() 

312 return result 

313 

314 def notify(self, n=1): 

315 """By default, wake up one task waiting on this condition, if any. 

316 If the calling task has not acquired the lock when this method 

317 is called, a RuntimeError is raised. 

318 

319 This method wakes up n of the tasks waiting for the condition 

320 variable; if fewer than n are waiting, they are all awoken. 

321 

322 Note: an awakened task does not actually return from its 

323 wait() call until it can reacquire the lock. Since notify() does 

324 not release the lock, its caller should. 

325 """ 

326 if not self.locked(): 

327 raise RuntimeError('cannot notify on un-acquired lock') 

328 self._notify(n) 

329 

330 def _notify(self, n): 

331 idx = 0 

332 for fut in self._waiters: 

333 if idx >= n: 

334 break 

335 

336 if not fut.done(): 

337 idx += 1 

338 fut.set_result(False) 

339 

340 def notify_all(self): 

341 """Wake up all tasks waiting on this condition. This method acts 

342 like notify(), but wakes up all waiting tasks instead of one. If the 

343 calling task has not acquired the lock when this method is called, 

344 a RuntimeError is raised. 

345 """ 

346 self.notify(len(self._waiters)) 

347 

348 

349class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin): 

350 """A Semaphore implementation. 

351 

352 A semaphore manages an internal counter which is decremented by each 

353 acquire() call and incremented by each release() call. The counter 

354 can never go below zero; when acquire() finds that it is zero, it 

355 blocks, waiting until some other thread calls release(). 

356 

357 Semaphores also support the context management protocol. 

358 

359 The optional argument gives the initial value for the internal 

360 counter; it defaults to 1. If the value given is less than 0, 

361 ValueError is raised. 

362 """ 

363 

364 def __init__(self, value=1): 

365 if value < 0: 

366 raise ValueError("Semaphore initial value must be >= 0") 

367 self._waiters = None 

368 self._value = value 

369 

370 def __repr__(self): 

371 res = super().__repr__() 

372 extra = 'locked' if self.locked() else f'unlocked, value:{self._value}' 

373 if self._waiters: 

374 extra = f'{extra}, waiters:{len(self._waiters)}' 

375 return f'<{res[1:-1]} [{extra}]>' 

376 

377 def locked(self): 

378 """Returns True if semaphore cannot be acquired immediately.""" 

379 # Due to state, or FIFO rules (must allow others to run first). 

380 return self._value == 0 or ( 

381 any(not w.cancelled() for w in (self._waiters or ()))) 

382 

383 async def acquire(self): 

384 """Acquire a semaphore. 

385 

386 If the internal counter is larger than zero on entry, 

387 decrement it by one and return True immediately. If it is 

388 zero on entry, block, waiting until some other task has 

389 called release() to make it larger than 0, and then return 

390 True. 

391 """ 

392 if not self.locked(): 

393 # Maintain FIFO, wait for others to start even if _value > 0. 

394 self._value -= 1 

395 return True 

396 

397 if self._waiters is None: 

398 self._waiters = collections.deque() 

399 fut = self._get_loop().create_future() 

400 self._waiters.append(fut) 

401 

402 try: 

403 try: 

404 await fut 

405 finally: 

406 self._waiters.remove(fut) 

407 except exceptions.CancelledError: 

408 # Currently the only exception designed be able to occur here. 

409 if fut.done() and not fut.cancelled(): 

410 # Our Future was successfully set to True via _wake_up_next(), 

411 # but we are not about to successfully acquire(). Therefore we 

412 # must undo the bookkeeping already done and attempt to wake 

413 # up someone else. 

414 self._value += 1 

415 raise 

416 

417 finally: 

418 # New waiters may have arrived but had to wait due to FIFO. 

419 # Wake up as many as are allowed. 

420 while self._value > 0: 

421 if not self._wake_up_next(): 

422 break # There was no-one to wake up. 

423 return True 

424 

425 def release(self): 

426 """Release a semaphore, incrementing the internal counter by one. 

427 

428 When it was zero on entry and another task is waiting for it to 

429 become larger than zero again, wake up that task. 

430 """ 

431 self._value += 1 

432 self._wake_up_next() 

433 

434 def _wake_up_next(self): 

435 """Wake up the first waiter that isn't done.""" 

436 if not self._waiters: 

437 return False 

438 

439 for fut in self._waiters: 

440 if not fut.done(): 

441 self._value -= 1 

442 fut.set_result(True) 

443 # `fut` is now `done()` and not `cancelled()`. 

444 return True 

445 return False 

446 

447 

448class BoundedSemaphore(Semaphore): 

449 """A bounded semaphore implementation. 

450 

451 This raises ValueError in release() if it would increase the value 

452 above the initial value. 

453 """ 

454 

455 def __init__(self, value=1): 

456 self._bound_value = value 

457 super().__init__(value) 

458 

459 def release(self): 

460 if self._value >= self._bound_value: 

461 raise ValueError('BoundedSemaphore released too many times') 

462 super().release() 

463 

464 

465 

466class _BarrierState(enum.Enum): 

467 FILLING = 'filling' 

468 DRAINING = 'draining' 

469 RESETTING = 'resetting' 

470 BROKEN = 'broken' 

471 

472 

473class Barrier(mixins._LoopBoundMixin): 

474 """Asyncio equivalent to threading.Barrier 

475 

476 Implements a Barrier primitive. 

477 Useful for synchronizing a fixed number of tasks at known synchronization 

478 points. Tasks block on 'wait()' and are simultaneously awoken once they 

479 have all made their call. 

480 """ 

481 

482 def __init__(self, parties): 

483 """Create a barrier, initialised to 'parties' tasks.""" 

484 if parties < 1: 

485 raise ValueError('parties must be >= 1') 

486 

487 self._cond = Condition() # notify all tasks when state changes 

488 

489 self._parties = parties 

490 self._state = _BarrierState.FILLING 

491 self._count = 0 # count tasks in Barrier 

492 

493 def __repr__(self): 

494 res = super().__repr__() 

495 extra = f'{self._state.value}' 

496 if not self.broken: 

497 extra += f', waiters:{self.n_waiting}/{self.parties}' 

498 return f'<{res[1:-1]} [{extra}]>' 

499 

500 async def __aenter__(self): 

501 # wait for the barrier reaches the parties number 

502 # when start draining release and return index of waited task 

503 return await self.wait() 

504 

505 async def __aexit__(self, *args): 

506 pass 

507 

508 async def wait(self): 

509 """Wait for the barrier. 

510 

511 When the specified number of tasks have started waiting, they are 

512 all simultaneously awoken. 

513 Returns an unique and individual index number from 0 to 'parties-1'. 

514 """ 

515 async with self._cond: 

516 await self._block() # Block while the barrier drains or resets. 

517 try: 

518 index = self._count 

519 self._count += 1 

520 if index + 1 == self._parties: 

521 # We release the barrier 

522 await self._release() 

523 else: 

524 await self._wait() 

525 return index 

526 finally: 

527 self._count -= 1 

528 # Wake up any tasks waiting for barrier to drain. 

529 self._exit() 

530 

531 async def _block(self): 

532 # Block until the barrier is ready for us, 

533 # or raise an exception if it is broken. 

534 # 

535 # It is draining or resetting, wait until done 

536 # unless a CancelledError occurs 

537 await self._cond.wait_for( 

538 lambda: self._state not in ( 

539 _BarrierState.DRAINING, _BarrierState.RESETTING 

540 ) 

541 ) 

542 

543 # see if the barrier is in a broken state 

544 if self._state is _BarrierState.BROKEN: 

545 raise exceptions.BrokenBarrierError("Barrier aborted") 

546 

547 async def _release(self): 

548 # Release the tasks waiting in the barrier. 

549 

550 # Enter draining state. 

551 # Next waiting tasks will be blocked until the end of draining. 

552 self._state = _BarrierState.DRAINING 

553 self._cond.notify_all() 

554 

555 async def _wait(self): 

556 # Wait in the barrier until we are released. Raise an exception 

557 # if the barrier is reset or broken. 

558 

559 # wait for end of filling 

560 # unless a CancelledError occurs 

561 await self._cond.wait_for(lambda: self._state is not _BarrierState.FILLING) 

562 

563 if self._state in (_BarrierState.BROKEN, _BarrierState.RESETTING): 

564 raise exceptions.BrokenBarrierError("Abort or reset of barrier") 

565 

566 def _exit(self): 

567 # If we are the last tasks to exit the barrier, signal any tasks 

568 # waiting for the barrier to drain. 

569 if self._count == 0: 

570 if self._state in (_BarrierState.RESETTING, _BarrierState.DRAINING): 

571 self._state = _BarrierState.FILLING 

572 self._cond.notify_all() 

573 

574 async def reset(self): 

575 """Reset the barrier to the initial state. 

576 

577 Any tasks currently waiting will get the BrokenBarrier exception 

578 raised. 

579 """ 

580 async with self._cond: 

581 if self._count > 0: 

582 if self._state is not _BarrierState.RESETTING: 582 ↛ 587line 582 didn't jump to line 587 because the condition on line 582 was always true

583 #reset the barrier, waking up tasks 

584 self._state = _BarrierState.RESETTING 

585 else: 

586 self._state = _BarrierState.FILLING 

587 self._cond.notify_all() 

588 

589 async def abort(self): 

590 """Place the barrier into a 'broken' state. 

591 

592 Useful in case of error. Any currently waiting tasks and tasks 

593 attempting to 'wait()' will have BrokenBarrierError raised. 

594 """ 

595 async with self._cond: 

596 self._state = _BarrierState.BROKEN 

597 self._cond.notify_all() 

598 

599 @property 

600 def parties(self): 

601 """Return the number of tasks required to trip the barrier.""" 

602 return self._parties 

603 

604 @property 

605 def n_waiting(self): 

606 """Return the number of tasks currently waiting at the barrier.""" 

607 if self._state is _BarrierState.FILLING: 

608 return self._count 

609 return 0 

610 

611 @property 

612 def broken(self): 

613 """Return True if the barrier is in a broken state.""" 

614 return self._state is _BarrierState.BROKEN