Coverage for Lib/asyncio/tasks.py: 95%

530 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-24 03:28 +0000

1"""Support for tasks, coroutines and the scheduler.""" 

2 

3__all__ = ( 

4 'Task', 'create_task', 

5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 

6 'wait', 'wait_for', 'as_completed', 'sleep', 

7 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 

8 'current_task', 'all_tasks', 

9 'create_eager_task_factory', 'eager_task_factory', 

10 '_register_task', '_unregister_task', '_enter_task', '_leave_task', 

11) 

12 

13import concurrent.futures 

14import contextvars 

15import functools 

16import inspect 

17import itertools 

18import math 

19import types 

20import weakref 

21from types import GenericAlias 

22 

23from . import base_tasks 

24from . import coroutines 

25from . import events 

26from . import exceptions 

27from . import futures 

28from . import queues 

29from . import timeouts 

30 

31# Helper to generate new task names 

32# This uses itertools.count() instead of a "+= 1" operation because the latter 

33# is not thread safe. See bpo-11866 for a longer explanation. 

34_task_name_counter = itertools.count(1).__next__ 

35 

36 

37def current_task(loop=None): 

38 """Return a currently executed task.""" 

39 if loop is None: 

40 loop = events.get_running_loop() 

41 return _current_tasks.get(loop) 

42 

43 

44def all_tasks(loop=None): 

45 """Return a set of all tasks for the loop.""" 

46 if loop is None: 

47 loop = events.get_running_loop() 

48 # capturing the set of eager tasks first, so if an eager task "graduates" 

49 # to a regular task in another thread, we don't risk missing it. 

50 eager_tasks = list(_eager_tasks) 

51 

52 return {t for t in itertools.chain(_scheduled_tasks, eager_tasks) 

53 if futures._get_loop(t) is loop and not t.done()} 

54 

55 

56class Task(futures._PyFuture): # Inherit Python Task implementation 

57 # from a Python Future implementation. 

58 

59 """A coroutine wrapped in a Future.""" 

60 

61 # An important invariant maintained while a Task not done: 

62 # _fut_waiter is either None or a Future. The Future 

63 # can be either done() or not done(). 

64 # The task can be in any of 3 states: 

65 # 

66 # - 1: _fut_waiter is not None and not _fut_waiter.done(): 

67 # __step() is *not* scheduled and the Task is waiting for _fut_waiter. 

68 # - 2: (_fut_waiter is None or _fut_waiter.done()) and __step() is scheduled: 

69 # the Task is waiting for __step() to be executed. 

70 # - 3: _fut_waiter is None and __step() is *not* scheduled: 

71 # the Task is currently executing (in __step()). 

72 # 

73 # * In state 1, one of the callbacks of __fut_waiter must be __wakeup(). 

74 # * The transition from 1 to 2 happens when _fut_waiter becomes done(), 

75 # as it schedules __wakeup() to be called (which calls __step() so 

76 # we way that __step() is scheduled). 

77 # * It transitions from 2 to 3 when __step() is executed, and it clears 

78 # _fut_waiter to None. 

79 

80 # If False, don't log a message if the task is destroyed while its 

81 # status is still pending 

82 _log_destroy_pending = True 

83 

84 def __init__(self, coro, *, loop=None, name=None, context=None, 

85 eager_start=False): 

86 super().__init__(loop=loop) 

87 if self._source_traceback: 

88 del self._source_traceback[-1] 

89 if not coroutines.iscoroutine(coro): 

90 # raise after Future.__init__(), attrs are required for __del__ 

91 # prevent logging for pending task in __del__ 

92 self._log_destroy_pending = False 

93 raise TypeError(f"a coroutine was expected, got {coro!r}") 

94 

95 if name is None: 

96 self._name = f'Task-{_task_name_counter()}' 

97 else: 

98 self._name = str(name) 

99 

100 self._num_cancels_requested = 0 

101 self._must_cancel = False 

102 self._fut_waiter = None 

103 self._coro = coro 

104 if context is None: 

105 self._context = contextvars.copy_context() 

106 else: 

107 self._context = context 

108 

109 if eager_start and self._loop.is_running(): 

110 self.__eager_start() 

111 else: 

112 self._loop.call_soon(self.__step, context=self._context) 

113 _py_register_task(self) 

114 

115 def __del__(self): 

116 if self._state == futures._PENDING and self._log_destroy_pending: 

117 context = { 

118 'task': self, 

119 'message': 'Task was destroyed but it is pending!', 

120 } 

121 if self._source_traceback: 

122 context['source_traceback'] = self._source_traceback 

123 self._loop.call_exception_handler(context) 

124 super().__del__() 

125 

126 __class_getitem__ = classmethod(GenericAlias) 

127 

128 def __repr__(self): 

129 return base_tasks._task_repr(self) 

130 

131 def get_coro(self): 

132 return self._coro 

133 

134 def get_context(self): 

135 return self._context 

136 

137 def get_name(self): 

138 return self._name 

139 

140 def set_name(self, value): 

141 self._name = str(value) 

142 

143 def set_result(self, result): 

144 raise RuntimeError('Task does not support set_result operation') 

145 

146 def set_exception(self, exception): 

147 raise RuntimeError('Task does not support set_exception operation') 

148 

149 def get_stack(self, *, limit=None): 

150 """Return the list of stack frames for this task's coroutine. 

151 

152 If the coroutine is not done, this returns the stack where it is 

153 suspended. If the coroutine has completed successfully or was 

154 cancelled, this returns an empty list. If the coroutine was 

155 terminated by an exception, this returns the list of traceback 

156 frames. 

157 

158 The frames are always ordered from oldest to newest. 

159 

160 The optional limit gives the maximum number of frames to 

161 return; by default all available frames are returned. Its 

162 meaning differs depending on whether a stack or a traceback is 

163 returned: the newest frames of a stack are returned, but the 

164 oldest frames of a traceback are returned. (This matches the 

165 behavior of the traceback module.) 

166 

167 For reasons beyond our control, only one stack frame is 

168 returned for a suspended coroutine. 

169 """ 

170 return base_tasks._task_get_stack(self, limit) 

171 

172 def print_stack(self, *, limit=None, file=None): 

173 """Print the stack or traceback for this task's coroutine. 

174 

175 This produces output similar to that of the traceback module, 

176 for the frames retrieved by get_stack(). The limit argument 

177 is passed to get_stack(). The file argument is an I/O stream 

178 to which the output is written; by default output is written 

179 to sys.stderr. 

180 """ 

181 return base_tasks._task_print_stack(self, limit, file) 

182 

183 def cancel(self, msg=None): 

184 """Request that this task cancel itself. 

185 

186 This arranges for a CancelledError to be thrown into the 

187 wrapped coroutine on the next cycle through the event loop. 

188 The coroutine then has a chance to clean up or even deny 

189 the request using try/except/finally. 

190 

191 Unlike Future.cancel, this does not guarantee that the 

192 task will be cancelled: the exception might be caught and 

193 acted upon, delaying cancellation of the task or preventing 

194 cancellation completely. The task may also return a value or 

195 raise a different exception. 

196 

197 Immediately after this method is called, Task.cancelled() will 

198 not return True (unless the task was already cancelled). A 

199 task will be marked as cancelled when the wrapped coroutine 

200 terminates with a CancelledError exception (even if cancel() 

201 was not called). 

202 

203 This also increases the task's count of cancellation requests. 

204 """ 

205 self._log_traceback = False 

206 if self.done(): 

207 return False 

208 self._num_cancels_requested += 1 

209 # These two lines are controversial. See discussion starting at 

210 # https://github.com/python/cpython/pull/31394#issuecomment-1053545331 

211 # Also remember that this is duplicated in _asynciomodule.c. 

212 # if self._num_cancels_requested > 1: 

213 # return False 

214 if self._fut_waiter is not None: 

215 if self._fut_waiter.cancel(msg=msg): 

216 # Leave self._fut_waiter; it may be a Task that 

217 # catches and ignores the cancellation so we may have 

218 # to cancel it again later. 

219 return True 

220 # It must be the case that self.__step is already scheduled. 

221 self._must_cancel = True 

222 self._cancel_message = msg 

223 return True 

224 

225 def cancelling(self): 

226 """Return the count of the task's cancellation requests. 

227 

228 This count is incremented when .cancel() is called 

229 and may be decremented using .uncancel(). 

230 """ 

231 return self._num_cancels_requested 

232 

233 def uncancel(self): 

234 """Decrement the task's count of cancellation requests. 

235 

236 This should be called by the party that called `cancel()` on the task 

237 beforehand. 

238 

239 Returns the remaining number of cancellation requests. 

240 """ 

241 if self._num_cancels_requested > 0: 241 ↛ 245line 241 didn't jump to line 245 because the condition on line 241 was always true

242 self._num_cancels_requested -= 1 

243 if self._num_cancels_requested == 0: 243 ↛ 245line 243 didn't jump to line 245 because the condition on line 243 was always true

244 self._must_cancel = False 

245 return self._num_cancels_requested 

246 

247 def __eager_start(self): 

248 prev_task = _py_swap_current_task(self._loop, self) 

249 try: 

250 _py_register_eager_task(self) 

251 try: 

252 self._context.run(self.__step_run_and_handle_result, None) 

253 finally: 

254 _py_unregister_eager_task(self) 

255 finally: 

256 try: 

257 curtask = _py_swap_current_task(self._loop, prev_task) 

258 assert curtask is self 

259 finally: 

260 if self.done(): 

261 self._coro = None 

262 self = None # Needed to break cycles when an exception occurs. 

263 else: 

264 _py_register_task(self) 

265 

266 def __step(self, exc=None): 

267 if self.done(): 

268 raise exceptions.InvalidStateError( 

269 f'__step(): already done: {self!r}, {exc!r}') 

270 if self._must_cancel: 

271 if not isinstance(exc, exceptions.CancelledError): 

272 exc = self._make_cancelled_error() 

273 self._must_cancel = False 

274 self._fut_waiter = None 

275 

276 _py_enter_task(self._loop, self) 

277 try: 

278 self.__step_run_and_handle_result(exc) 

279 finally: 

280 _py_leave_task(self._loop, self) 

281 self = None # Needed to break cycles when an exception occurs. 

282 

283 def __step_run_and_handle_result(self, exc): 

284 coro = self._coro 

285 try: 

286 if exc is None: 

287 # We use the `send` method directly, because coroutines 

288 # don't have `__iter__` and `__next__` methods. 

289 result = coro.send(None) 

290 else: 

291 result = coro.throw(exc) 

292 except StopIteration as exc: 

293 if self._must_cancel: 

294 # Task is cancelled right before coro stops. 

295 self._must_cancel = False 

296 super().cancel(msg=self._cancel_message) 

297 else: 

298 super().set_result(exc.value) 

299 except exceptions.CancelledError as exc: 

300 # Save the original exception so we can chain it later. 

301 self._cancelled_exc = exc 

302 super().cancel() # I.e., Future.cancel(self). 

303 except (KeyboardInterrupt, SystemExit) as exc: 

304 super().set_exception(exc) 

305 raise 

306 except BaseException as exc: 

307 super().set_exception(exc) 

308 else: 

309 blocking = getattr(result, '_asyncio_future_blocking', None) 

310 if blocking is not None: 

311 # Yielded Future must come from Future.__iter__(). 

312 if futures._get_loop(result) is not self._loop: 

313 new_exc = RuntimeError( 

314 f'Task {self!r} got Future ' 

315 f'{result!r} attached to a different loop') 

316 self._loop.call_soon( 

317 self.__step, new_exc, context=self._context) 

318 elif blocking: 318 ↛ 335line 318 didn't jump to line 335 because the condition on line 318 was always true

319 if result is self: 

320 new_exc = RuntimeError( 

321 f'Task cannot await on itself: {self!r}') 

322 self._loop.call_soon( 

323 self.__step, new_exc, context=self._context) 

324 else: 

325 futures.future_add_to_awaited_by(result, self) 

326 result._asyncio_future_blocking = False 

327 result.add_done_callback( 

328 self.__wakeup, context=self._context) 

329 self._fut_waiter = result 

330 if self._must_cancel: 

331 if self._fut_waiter.cancel( 331 ↛ 357line 331 didn't jump to line 357 because the condition on line 331 was always true

332 msg=self._cancel_message): 

333 self._must_cancel = False 

334 else: 

335 new_exc = RuntimeError( 

336 f'yield was used instead of yield from ' 

337 f'in task {self!r} with {result!r}') 

338 self._loop.call_soon( 

339 self.__step, new_exc, context=self._context) 

340 

341 elif result is None: 341 ↛ 344line 341 didn't jump to line 344 because the condition on line 341 was always true

342 # Bare yield relinquishes control for one event loop iteration. 

343 self._loop.call_soon(self.__step, context=self._context) 

344 elif inspect.isgenerator(result): 

345 # Yielding a generator is just wrong. 

346 new_exc = RuntimeError( 

347 f'yield was used instead of yield from for ' 

348 f'generator in task {self!r} with {result!r}') 

349 self._loop.call_soon( 

350 self.__step, new_exc, context=self._context) 

351 else: 

352 # Yielding something else is an error. 

353 new_exc = RuntimeError(f'Task got bad yield: {result!r}') 

354 self._loop.call_soon( 

355 self.__step, new_exc, context=self._context) 

356 finally: 

357 self = None # Needed to break cycles when an exception occurs. 

358 

359 def __wakeup(self, future): 

360 futures.future_discard_from_awaited_by(future, self) 

361 try: 

362 future.result() 

363 except BaseException as exc: 

364 # This may also be a cancellation. 

365 self.__step(exc) 

366 else: 

367 # Don't pass the value of `future.result()` explicitly, 

368 # as `Future.__iter__` and `Future.__await__` don't need it. 

369 # If we call `__step(value, None)` instead of `__step()`, 

370 # Python eval loop would use `.send(value)` method call, 

371 # instead of `__next__()`, which is slower for futures 

372 # that return non-generator iterators from their `__iter__`. 

373 self.__step() 

374 self = None # Needed to break cycles when an exception occurs. 

375 

376 

377_PyTask = Task 

378 

379 

380try: 

381 import _asyncio 

382except ImportError: 

383 pass 

384else: 

385 # _CTask is needed for tests. 

386 Task = _CTask = _asyncio.Task 

387 

388 

389def create_task(coro, **kwargs): 

390 """Schedule the execution of a coroutine object in a spawn task. 

391 

392 Return a Task object. 

393 """ 

394 loop = events.get_running_loop() 

395 return loop.create_task(coro, **kwargs) 

396 

397 

398# wait() and as_completed() similar to those in PEP 3148. 

399 

400FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED 

401FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION 

402ALL_COMPLETED = concurrent.futures.ALL_COMPLETED 

403 

404 

405async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED): 

406 """Wait for the Futures or Tasks given by fs to complete. 

407 

408 The fs iterable must not be empty. 

409 

410 Returns two sets of Future: (done, pending). 

411 

412 Usage: 

413 

414 done, pending = await asyncio.wait(fs) 

415 

416 Note: This does not raise TimeoutError! Futures that aren't done 

417 when the timeout occurs are returned in the second set. 

418 """ 

419 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 

420 raise TypeError(f"expect a list of futures, not {type(fs).__name__}") 

421 if not fs: 

422 raise ValueError('Set of Tasks/Futures is empty.') 

423 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): 

424 raise ValueError(f'Invalid return_when value: {return_when}') 

425 

426 fs = set(fs) 

427 

428 if any(coroutines.iscoroutine(f) for f in fs): 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true

429 raise TypeError("Passing coroutines is forbidden, use tasks explicitly.") 

430 

431 loop = events.get_running_loop() 

432 return await _wait(fs, timeout, return_when, loop) 

433 

434 

435def _release_waiter(waiter, *args): 

436 if not waiter.done(): 436 ↛ exitline 436 didn't return from function '_release_waiter' because the condition on line 436 was always true

437 waiter.set_result(None) 

438 

439 

440async def wait_for(fut, timeout): 

441 """Wait for the single Future or coroutine to complete, with timeout. 

442 

443 Coroutine will be wrapped in Task. 

444 

445 Returns result of the Future or coroutine. When a timeout occurs, 

446 it cancels the task and raises TimeoutError. To avoid the task 

447 cancellation, wrap it in shield(). 

448 

449 If the wait is cancelled, the task is also cancelled. 

450 

451 If the task suppresses the cancellation and returns a value instead, 

452 that value is returned. 

453 

454 This function is a coroutine. 

455 """ 

456 # The special case for timeout <= 0 is for the following case: 

457 # 

458 # async def test_waitfor(): 

459 # func_started = False 

460 # 

461 # async def func(): 

462 # nonlocal func_started 

463 # func_started = True 

464 # 

465 # try: 

466 # await asyncio.wait_for(func(), 0) 

467 # except asyncio.TimeoutError: 

468 # assert not func_started 

469 # else: 

470 # assert False 

471 # 

472 # asyncio.run(test_waitfor()) 

473 

474 

475 if timeout is not None and timeout <= 0: 

476 fut = ensure_future(fut) 

477 

478 if fut.done(): 

479 return fut.result() 

480 

481 await _cancel_and_wait(fut) 

482 try: 

483 return fut.result() 

484 except exceptions.CancelledError as exc: 

485 raise TimeoutError from exc 

486 

487 async with timeouts.timeout(timeout): 

488 return await fut 

489 

490async def _wait(fs, timeout, return_when, loop): 

491 """Internal helper for wait(). 

492 

493 The fs argument must be a collection of Futures. 

494 """ 

495 assert fs, 'Set of Futures is empty.' 

496 waiter = loop.create_future() 

497 timeout_handle = None 

498 if timeout is not None: 

499 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 

500 counter = len(fs) 

501 cur_task = current_task() 

502 

503 def _on_completion(f): 

504 nonlocal counter 

505 counter -= 1 

506 if (counter <= 0 or 

507 return_when == FIRST_COMPLETED or 

508 return_when == FIRST_EXCEPTION and (not f.cancelled() and 

509 f.exception() is not None)): 

510 if timeout_handle is not None: 510 ↛ 511line 510 didn't jump to line 511 because the condition on line 510 was never true

511 timeout_handle.cancel() 

512 if not waiter.done(): 

513 waiter.set_result(None) 

514 futures.future_discard_from_awaited_by(f, cur_task) 

515 

516 for f in fs: 

517 f.add_done_callback(_on_completion) 

518 futures.future_add_to_awaited_by(f, cur_task) 

519 

520 try: 

521 await waiter 

522 finally: 

523 if timeout_handle is not None: 

524 timeout_handle.cancel() 

525 for f in fs: 

526 f.remove_done_callback(_on_completion) 

527 

528 done, pending = set(), set() 

529 for f in fs: 

530 if f.done(): 

531 done.add(f) 

532 else: 

533 pending.add(f) 

534 return done, pending 

535 

536 

537async def _cancel_and_wait(fut): 

538 """Cancel the *fut* future or task and wait until it completes.""" 

539 

540 loop = events.get_running_loop() 

541 waiter = loop.create_future() 

542 cb = functools.partial(_release_waiter, waiter) 

543 fut.add_done_callback(cb) 

544 

545 try: 

546 fut.cancel() 

547 # We cannot wait on *fut* directly to make 

548 # sure _cancel_and_wait itself is reliably cancellable. 

549 await waiter 

550 finally: 

551 fut.remove_done_callback(cb) 

552 

553 

554class _AsCompletedIterator: 

555 """Iterator of awaitables representing tasks of asyncio.as_completed. 

556 

557 As an asynchronous iterator, iteration yields futures as they finish. As a 

558 plain iterator, new coroutines are yielded that will return or raise the 

559 result of the next underlying future to complete. 

560 """ 

561 def __init__(self, aws, timeout): 

562 self._done = queues.Queue() 

563 self._timeout_handle = None 

564 

565 loop = events.get_event_loop() 

566 todo = {ensure_future(aw, loop=loop) for aw in set(aws)} 

567 for f in todo: 

568 f.add_done_callback(self._handle_completion) 

569 if todo and timeout is not None: 

570 self._timeout_handle = ( 

571 loop.call_later(timeout, self._handle_timeout) 

572 ) 

573 self._todo = todo 

574 self._todo_left = len(todo) 

575 

576 def __aiter__(self): 

577 return self 

578 

579 def __iter__(self): 

580 return self 

581 

582 async def __anext__(self): 

583 if not self._todo_left: 

584 raise StopAsyncIteration 

585 assert self._todo_left > 0 

586 self._todo_left -= 1 

587 return await self._wait_for_one() 

588 

589 def __next__(self): 

590 if not self._todo_left: 

591 raise StopIteration 

592 assert self._todo_left > 0 

593 self._todo_left -= 1 

594 return self._wait_for_one(resolve=True) 

595 

596 def _handle_timeout(self): 

597 for f in self._todo: 

598 f.remove_done_callback(self._handle_completion) 

599 self._done.put_nowait(None) # Sentinel for _wait_for_one(). 

600 self._todo.clear() # Can't do todo.remove(f) in the loop. 

601 

602 def _handle_completion(self, f): 

603 if not self._todo: 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true

604 return # _handle_timeout() was here first. 

605 self._todo.remove(f) 

606 self._done.put_nowait(f) 

607 if not self._todo and self._timeout_handle is not None: 

608 self._timeout_handle.cancel() 

609 

610 async def _wait_for_one(self, resolve=False): 

611 # Wait for the next future to be done and return it unless resolve is 

612 # set, in which case return either the result of the future or raise 

613 # an exception. 

614 f = await self._done.get() 

615 if f is None: 

616 # Dummy value from _handle_timeout(). 

617 raise exceptions.TimeoutError 

618 return f.result() if resolve else f 

619 

620 

621def as_completed(fs, *, timeout=None): 

622 """Create an iterator of awaitables or their results in completion order. 

623 

624 Run the supplied awaitables concurrently. The returned object can be 

625 iterated to obtain the results of the awaitables as they finish. 

626 

627 The object returned can be iterated as an asynchronous iterator or 

628 a plain iterator. When asynchronous iteration is used, the 

629 originally-supplied awaitables are yielded if they are tasks or 

630 futures. This makes it easy to correlate previously-scheduled tasks 

631 with their results: 

632 

633 ipv4_connect = create_task(open_connection("127.0.0.1", 80)) 

634 ipv6_connect = create_task(open_connection("::1", 80)) 

635 tasks = [ipv4_connect, ipv6_connect] 

636 

637 async for earliest_connect in as_completed(tasks): 

638 # earliest_connect is done. The result can be obtained by 

639 # awaiting it or calling earliest_connect.result() 

640 reader, writer = await earliest_connect 

641 

642 if earliest_connect is ipv6_connect: 

643 print("IPv6 connection established.") 

644 else: 

645 print("IPv4 connection established.") 

646 

647 During asynchronous iteration, implicitly-created tasks will be 

648 yielded for supplied awaitables that aren't tasks or futures. 

649 

650 When used as a plain iterator, each iteration yields a new coroutine 

651 that returns the result or raises the exception of the next completed 

652 awaitable. This pattern is compatible with Python versions older than 

653 3.13: 

654 

655 ipv4_connect = create_task(open_connection("127.0.0.1", 80)) 

656 ipv6_connect = create_task(open_connection("::1", 80)) 

657 tasks = [ipv4_connect, ipv6_connect] 

658 

659 for next_connect in as_completed(tasks): 

660 # next_connect is not one of the original task objects. It must 

661 # be awaited to obtain the result value or raise the exception 

662 # of the awaitable that finishes next. 

663 reader, writer = await next_connect 

664 

665 A TimeoutError is raised if the timeout occurs before all awaitables 

666 are done. This is raised by the async for loop during asynchronous 

667 iteration or by the coroutines yielded during plain iteration. 

668 """ 

669 if inspect.isawaitable(fs): 

670 raise TypeError( 

671 f"expects an iterable of awaitables, not {type(fs).__name__}" 

672 ) 

673 

674 return _AsCompletedIterator(fs, timeout) 

675 

676 

677@types.coroutine 

678def __sleep0(): 

679 """Skip one event loop run cycle. 

680 

681 This is a private helper for 'asyncio.sleep()', used 

682 when the 'delay' is set to 0. It uses a bare 'yield' 

683 expression (which Task.__step knows how to handle) 

684 instead of creating a Future object. 

685 """ 

686 yield 

687 

688 

689async def sleep(delay, result=None): 

690 """Coroutine that completes after a given time (in seconds).""" 

691 if delay <= 0: 

692 await __sleep0() 

693 return result 

694 

695 if math.isnan(delay): 

696 raise ValueError("Invalid delay: NaN (not a number)") 

697 

698 loop = events.get_running_loop() 

699 future = loop.create_future() 

700 h = loop.call_later(delay, 

701 futures._set_result_unless_cancelled, 

702 future, result) 

703 try: 

704 return await future 

705 finally: 

706 h.cancel() 

707 

708 

709def ensure_future(coro_or_future, *, loop=None): 

710 """Wrap a coroutine or an awaitable in a future. 

711 

712 If the argument is a Future, it is returned directly. 

713 """ 

714 if futures.isfuture(coro_or_future): 

715 if loop is not None and loop is not futures._get_loop(coro_or_future): 

716 raise ValueError('The future belongs to a different loop than ' 

717 'the one specified as the loop argument') 

718 return coro_or_future 

719 should_close = True 

720 if not coroutines.iscoroutine(coro_or_future): 

721 if inspect.isawaitable(coro_or_future): 

722 async def _wrap_awaitable(awaitable): 

723 return await awaitable 

724 

725 coro_or_future = _wrap_awaitable(coro_or_future) 

726 should_close = False 

727 else: 

728 raise TypeError('An asyncio.Future, a coroutine or an awaitable ' 

729 'is required') 

730 

731 if loop is None: 

732 loop = events.get_event_loop() 

733 try: 

734 return loop.create_task(coro_or_future) 

735 except RuntimeError: 

736 if should_close: 736 ↛ 738line 736 didn't jump to line 738 because the condition on line 736 was always true

737 coro_or_future.close() 

738 raise 

739 

740 

741class _GatheringFuture(futures.Future): 

742 """Helper for gather(). 

743 

744 This overrides cancel() to cancel all the children and act more 

745 like Task.cancel(), which doesn't immediately mark itself as 

746 cancelled. 

747 """ 

748 

749 def __init__(self, children, *, loop): 

750 assert loop is not None 

751 super().__init__(loop=loop) 

752 self._children = children 

753 self._cancel_requested = False 

754 

755 def cancel(self, msg=None): 

756 if self.done(): 

757 return False 

758 ret = False 

759 for child in self._children: 

760 if child.cancel(msg=msg): 

761 ret = True 

762 if ret: 

763 # If any child tasks were actually cancelled, we should 

764 # propagate the cancellation request regardless of 

765 # *return_exceptions* argument. See issue 32684. 

766 self._cancel_requested = True 

767 return ret 

768 

769 

770def gather(*coros_or_futures, return_exceptions=False): 

771 """Return a future aggregating results from the given coroutines/futures. 

772 

773 Coroutines will be wrapped in a future and scheduled in the event 

774 loop. They will not necessarily be scheduled in the same order as 

775 passed in. 

776 

777 All futures must share the same event loop. If all the tasks are 

778 done successfully, the returned future's result is the list of 

779 results (in the order of the original sequence, not necessarily 

780 the order of results arrival). If *return_exceptions* is True, 

781 exceptions in the tasks are treated the same as successful 

782 results, and gathered in the result list; otherwise, the first 

783 raised exception will be immediately propagated to the returned 

784 future. 

785 

786 Cancellation: if the outer Future is cancelled, all children (that 

787 have not completed yet) are also cancelled. If any child is 

788 cancelled, this is treated as if it raised CancelledError -- 

789 the outer Future is *not* cancelled in this case. (This is to 

790 prevent the cancellation of one child to cause other children to 

791 be cancelled.) 

792 

793 If *return_exceptions* is False, cancelling gather() after it 

794 has been marked done won't cancel any submitted awaitables. 

795 For instance, gather can be marked done after propagating an 

796 exception to the caller, therefore, calling ``gather.cancel()`` 

797 after catching an exception (raised by one of the awaitables) from 

798 gather won't cancel any other awaitables. 

799 """ 

800 if not coros_or_futures: 

801 loop = events.get_event_loop() 

802 outer = loop.create_future() 

803 outer.set_result([]) 

804 return outer 

805 

806 loop = events._get_running_loop() 

807 if loop is not None: 

808 cur_task = current_task(loop) 

809 else: 

810 cur_task = None 

811 

812 def _done_callback(fut, cur_task=cur_task): 

813 nonlocal nfinished 

814 nfinished += 1 

815 

816 if cur_task is not None: 

817 futures.future_discard_from_awaited_by(fut, cur_task) 

818 

819 if outer is None or outer.done(): 

820 if not fut.cancelled(): 

821 # Mark exception retrieved. 

822 fut.exception() 

823 return 

824 

825 if not return_exceptions: 

826 if fut.cancelled(): 

827 # Check if 'fut' is cancelled first, as 

828 # 'fut.exception()' will *raise* a CancelledError 

829 # instead of returning it. 

830 exc = fut._make_cancelled_error() 

831 outer.set_exception(exc) 

832 return 

833 else: 

834 exc = fut.exception() 

835 if exc is not None: 

836 outer.set_exception(exc) 

837 return 

838 

839 if nfinished == nfuts: 

840 # All futures are done; create a list of results 

841 # and set it to the 'outer' future. 

842 results = [] 

843 

844 for fut in children: 

845 if fut.cancelled(): 

846 # Check if 'fut' is cancelled first, as 'fut.exception()' 

847 # will *raise* a CancelledError instead of returning it. 

848 # Also, since we're adding the exception return value 

849 # to 'results' instead of raising it, don't bother 

850 # setting __context__. This also lets us preserve 

851 # calling '_make_cancelled_error()' at most once. 

852 res = exceptions.CancelledError( 

853 '' if fut._cancel_message is None else 

854 fut._cancel_message) 

855 else: 

856 res = fut.exception() 

857 if res is None: 

858 res = fut.result() 

859 results.append(res) 

860 

861 if outer._cancel_requested: 

862 # If gather is being cancelled we must propagate the 

863 # cancellation regardless of *return_exceptions* argument. 

864 # See issue 32684. 

865 exc = fut._make_cancelled_error() 

866 outer.set_exception(exc) 

867 else: 

868 outer.set_result(results) 

869 

870 arg_to_fut = {} 

871 children = [] 

872 nfuts = 0 

873 nfinished = 0 

874 done_futs = [] 

875 outer = None # bpo-46672 

876 for arg in coros_or_futures: 

877 if arg not in arg_to_fut: 

878 fut = ensure_future(arg, loop=loop) 

879 if loop is None: 

880 loop = futures._get_loop(fut) 

881 if fut is not arg: 

882 # 'arg' was not a Future, therefore, 'fut' is a new 

883 # Future created specifically for 'arg'. Since the caller 

884 # can't control it, disable the "destroy pending task" 

885 # warning. 

886 fut._log_destroy_pending = False 

887 nfuts += 1 

888 arg_to_fut[arg] = fut 

889 if fut.done(): 

890 done_futs.append(fut) 

891 else: 

892 if cur_task is not None: 

893 futures.future_add_to_awaited_by(fut, cur_task) 

894 fut.add_done_callback(_done_callback) 

895 

896 else: 

897 # There's a duplicate Future object in coros_or_futures. 

898 fut = arg_to_fut[arg] 

899 

900 children.append(fut) 

901 

902 outer = _GatheringFuture(children, loop=loop) 

903 # Run done callbacks after GatheringFuture created so any post-processing 

904 # can be performed at this point 

905 # optimization: in the special case that *all* futures finished eagerly, 

906 # this will effectively complete the gather eagerly, with the last 

907 # callback setting the result (or exception) on outer before returning it 

908 for fut in done_futs: 

909 _done_callback(fut) 

910 return outer 

911 

912 

913def _log_on_exception(fut): 

914 if fut.cancelled(): 914 ↛ 915line 914 didn't jump to line 915 because the condition on line 914 was never true

915 return 

916 

917 exc = fut.exception() 

918 if exc is None: 

919 return 

920 

921 context = { 

922 'message': 

923 f'{exc.__class__.__name__} exception in shielded future', 

924 'exception': exc, 

925 'future': fut, 

926 } 

927 if fut._source_traceback: 927 ↛ 928line 927 didn't jump to line 928 because the condition on line 927 was never true

928 context['source_traceback'] = fut._source_traceback 

929 fut._loop.call_exception_handler(context) 

930 

931 

932def shield(arg): 

933 """Wait for a future, shielding it from cancellation. 

934 

935 The statement 

936 

937 task = asyncio.create_task(something()) 

938 res = await shield(task) 

939 

940 is exactly equivalent to the statement 

941 

942 res = await something() 

943 

944 *except* that if the coroutine containing it is cancelled, the 

945 task running in something() is not cancelled. From the POV of 

946 something(), the cancellation did not happen. But its caller is 

947 still cancelled, so the yield-from expression still raises 

948 CancelledError. Note: If something() is cancelled by other means 

949 this will still cancel shield(). 

950 

951 If you want to completely ignore cancellation (not recommended) 

952 you can combine shield() with a try/except clause, as follows: 

953 

954 task = asyncio.create_task(something()) 

955 try: 

956 res = await shield(task) 

957 except CancelledError: 

958 res = None 

959 

960 Save a reference to tasks passed to this function, to avoid 

961 a task disappearing mid-execution. The event loop only keeps 

962 weak references to tasks. A task that isn't referenced elsewhere 

963 may get garbage collected at any time, even before it's done. 

964 """ 

965 inner = ensure_future(arg) 

966 if inner.done(): 

967 # Shortcut. 

968 return inner 

969 loop = futures._get_loop(inner) 

970 outer = loop.create_future() 

971 

972 if loop is not None and (cur_task := current_task(loop)) is not None: 

973 futures.future_add_to_awaited_by(inner, cur_task) 

974 else: 

975 cur_task = None 

976 

977 def _clear_awaited_by_callback(inner): 

978 futures.future_discard_from_awaited_by(inner, cur_task) 

979 

980 def _inner_done_callback(inner): 

981 if outer.cancelled(): 981 ↛ 982line 981 didn't jump to line 982 because the condition on line 981 was never true

982 return 

983 

984 if inner.cancelled(): 

985 outer.cancel() 

986 else: 

987 exc = inner.exception() 

988 if exc is not None: 

989 outer.set_exception(exc) 

990 else: 

991 outer.set_result(inner.result()) 

992 

993 def _outer_done_callback(outer): 

994 if not inner.done(): 

995 inner.remove_done_callback(_inner_done_callback) 

996 # Keep only one callback to log on cancel 

997 inner.remove_done_callback(_log_on_exception) 

998 inner.add_done_callback(_log_on_exception) 

999 

1000 if cur_task is not None: 

1001 inner.add_done_callback(_clear_awaited_by_callback) 

1002 

1003 

1004 inner.add_done_callback(_inner_done_callback) 

1005 outer.add_done_callback(_outer_done_callback) 

1006 return outer 

1007 

1008 

1009def run_coroutine_threadsafe(coro, loop): 

1010 """Submit a coroutine object to a given event loop. 

1011 

1012 Return a concurrent.futures.Future to access the result. 

1013 """ 

1014 if not coroutines.iscoroutine(coro): 1014 ↛ 1015line 1014 didn't jump to line 1015 because the condition on line 1014 was never true

1015 raise TypeError('A coroutine object is required') 

1016 future = concurrent.futures.Future() 

1017 

1018 def callback(): 

1019 try: 

1020 futures._chain_future(ensure_future(coro, loop=loop), future) 

1021 except (SystemExit, KeyboardInterrupt): 

1022 raise 

1023 except BaseException as exc: 

1024 if future.set_running_or_notify_cancel(): 1024 ↛ 1026line 1024 didn't jump to line 1026 because the condition on line 1024 was always true

1025 future.set_exception(exc) 

1026 raise 

1027 

1028 loop.call_soon_threadsafe(callback) 

1029 return future 

1030 

1031 

1032def create_eager_task_factory(custom_task_constructor): 

1033 """Create a function suitable for use as a task factory on an event-loop. 

1034 

1035 Example usage: 

1036 

1037 loop.set_task_factory( 

1038 asyncio.create_eager_task_factory(my_task_constructor)) 

1039 

1040 Now, tasks created will be started immediately (rather than being first 

1041 scheduled to an event loop). The constructor argument can be any 

1042 callable that returns a Task-compatible object and has a signature 

1043 compatible with `Task.__init__`; it must have the `eager_start` 

1044 keyword argument. 

1045 

1046 Most applications will use `Task` for `custom_task_constructor` and in 

1047 this case there's no need to call `create_eager_task_factory()` 

1048 directly. Instead the global `eager_task_factory` instance can be 

1049 used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`. 

1050 """ 

1051 

1052 def factory(loop, coro, *, eager_start=True, **kwargs): 

1053 return custom_task_constructor( 

1054 coro, loop=loop, eager_start=eager_start, **kwargs) 

1055 

1056 return factory 

1057 

1058 

1059eager_task_factory = create_eager_task_factory(Task) 

1060 

1061 

1062# Collectively these two sets hold references to the complete set of active 

1063# tasks. Eagerly executed tasks use a faster regular set as an optimization 

1064# but may graduate to a WeakSet if the task blocks on IO. 

1065_scheduled_tasks = weakref.WeakSet() 

1066_eager_tasks = set() 

1067 

1068# Dictionary containing tasks that are currently active in 

1069# all running event loops. {EventLoop: Task} 

1070_current_tasks = {} 

1071 

1072 

1073def _register_task(task): 

1074 """Register an asyncio Task scheduled to run on an event loop.""" 

1075 _scheduled_tasks.add(task) 

1076 

1077 

1078def _register_eager_task(task): 

1079 """Register an asyncio Task about to be eagerly executed.""" 

1080 _eager_tasks.add(task) 

1081 

1082 

1083def _enter_task(loop, task): 

1084 current_task = _current_tasks.get(loop) 

1085 if current_task is not None: 

1086 raise RuntimeError(f"Cannot enter into task {task!r} while another " 

1087 f"task {current_task!r} is being executed.") 

1088 _current_tasks[loop] = task 

1089 

1090 

1091def _leave_task(loop, task): 

1092 current_task = _current_tasks.get(loop) 

1093 if current_task is not task: 

1094 raise RuntimeError(f"Leaving task {task!r} does not match " 

1095 f"the current task {current_task!r}.") 

1096 del _current_tasks[loop] 

1097 

1098 

1099def _swap_current_task(loop, task): 

1100 prev_task = _current_tasks.get(loop) 

1101 if task is None: 

1102 del _current_tasks[loop] 

1103 else: 

1104 _current_tasks[loop] = task 

1105 return prev_task 

1106 

1107 

1108def _unregister_task(task): 

1109 """Unregister a completed, scheduled Task.""" 

1110 _scheduled_tasks.discard(task) 

1111 

1112 

1113def _unregister_eager_task(task): 

1114 """Unregister a task which finished its first eager step.""" 

1115 _eager_tasks.discard(task) 

1116 

1117 

1118_py_current_task = current_task 

1119_py_register_task = _register_task 

1120_py_register_eager_task = _register_eager_task 

1121_py_unregister_task = _unregister_task 

1122_py_unregister_eager_task = _unregister_eager_task 

1123_py_enter_task = _enter_task 

1124_py_leave_task = _leave_task 

1125_py_swap_current_task = _swap_current_task 

1126_py_all_tasks = all_tasks 

1127 

1128try: 

1129 from _asyncio import (_register_task, _register_eager_task, 

1130 _unregister_task, _unregister_eager_task, 

1131 _enter_task, _leave_task, _swap_current_task, 

1132 current_task, all_tasks) 

1133except ImportError: 

1134 pass 

1135else: 

1136 _c_current_task = current_task 

1137 _c_register_task = _register_task 

1138 _c_register_eager_task = _register_eager_task 

1139 _c_unregister_task = _unregister_task 

1140 _c_unregister_eager_task = _unregister_eager_task 

1141 _c_enter_task = _enter_task 

1142 _c_leave_task = _leave_task 

1143 _c_swap_current_task = _swap_current_task 

1144 _c_all_tasks = all_tasks