Coverage for Lib/asyncio/tasks.py: 95%

530 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-23 01:21 +0000

1"""Support for tasks, coroutines and the scheduler.""" 

2 

3__all__ = ( 

4 'Task', 'create_task', 

5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 

6 'wait', 'wait_for', 'as_completed', 'sleep', 

7 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 

8 'current_task', 'all_tasks', 

9 'create_eager_task_factory', 'eager_task_factory', 

10 '_register_task', '_unregister_task', '_enter_task', '_leave_task', 

11) 

12 

13import concurrent.futures 

14import contextvars 

15import functools 

16import inspect 

17import itertools 

18import math 

19import types 

20import weakref 

21from types import GenericAlias 

22 

23from . import base_tasks 

24from . import coroutines 

25from . import events 

26from . import exceptions 

27from . import futures 

28from . import queues 

29from . import timeouts 

30 

31# Helper to generate new task names 

32# This uses itertools.count() instead of a "+= 1" operation because the latter 

33# is not thread safe. See bpo-11866 for a longer explanation. 

34_task_name_counter = itertools.count(1).__next__ 

35 

36 

37def current_task(loop=None): 

38 """Return a currently executed task.""" 

39 if loop is None: 

40 loop = events.get_running_loop() 

41 return _current_tasks.get(loop) 

42 

43 

44def all_tasks(loop=None): 

45 """Return a set of all tasks for the loop.""" 

46 if loop is None: 

47 loop = events.get_running_loop() 

48 # capturing the set of eager tasks first, so if an eager task "graduates" 

49 # to a regular task in another thread, we don't risk missing it. 

50 eager_tasks = list(_eager_tasks) 

51 

52 return {t for t in itertools.chain(_scheduled_tasks, eager_tasks) 

53 if futures._get_loop(t) is loop and not t.done()} 

54 

55 

56class Task(futures._PyFuture): # Inherit Python Task implementation 

57 # from a Python Future implementation. 

58 

59 """A coroutine wrapped in a Future.""" 

60 

61 # An important invariant maintained while a Task not done: 

62 # _fut_waiter is either None or a Future. The Future 

63 # can be either done() or not done(). 

64 # The task can be in any of 3 states: 

65 # 

66 # - 1: _fut_waiter is not None and not _fut_waiter.done(): 

67 # __step() is *not* scheduled and the Task is waiting for _fut_waiter. 

68 # - 2: (_fut_waiter is None or _fut_waiter.done()) and __step() is scheduled: 

69 # the Task is waiting for __step() to be executed. 

70 # - 3: _fut_waiter is None and __step() is *not* scheduled: 

71 # the Task is currently executing (in __step()). 

72 # 

73 # * In state 1, one of the callbacks of __fut_waiter must be __wakeup(). 

74 # * The transition from 1 to 2 happens when _fut_waiter becomes done(), 

75 # as it schedules __wakeup() to be called (which calls __step() so 

76 # we way that __step() is scheduled). 

77 # * It transitions from 2 to 3 when __step() is executed, and it clears 

78 # _fut_waiter to None. 

79 

80 # If False, don't log a message if the task is destroyed while its 

81 # status is still pending 

82 _log_destroy_pending = True 

83 

84 def __init__(self, coro, *, loop=None, name=None, context=None, 

85 eager_start=False): 

86 super().__init__(loop=loop) 

87 if self._source_traceback: 

88 del self._source_traceback[-1] 

89 if not coroutines.iscoroutine(coro): 

90 # raise after Future.__init__(), attrs are required for __del__ 

91 # prevent logging for pending task in __del__ 

92 self._log_destroy_pending = False 

93 raise TypeError(f"a coroutine was expected, got {coro!r}") 

94 

95 if name is None: 

96 self._name = f'Task-{_task_name_counter()}' 

97 else: 

98 self._name = str(name) 

99 

100 self._num_cancels_requested = 0 

101 self._must_cancel = False 

102 self._fut_waiter = None 

103 self._coro = coro 

104 if context is None: 

105 self._context = contextvars.copy_context() 

106 else: 

107 self._context = context 

108 

109 if eager_start and self._loop.is_running(): 

110 self.__eager_start() 

111 else: 

112 self._loop.call_soon(self.__step, context=self._context) 

113 _py_register_task(self) 

114 

115 def __del__(self): 

116 if self._state == futures._PENDING and self._log_destroy_pending: 

117 context = { 

118 'task': self, 

119 'message': 'Task was destroyed but it is pending!', 

120 } 

121 if self._source_traceback: 

122 context['source_traceback'] = self._source_traceback 

123 self._loop.call_exception_handler(context) 

124 super().__del__() 

125 

126 __class_getitem__ = classmethod(GenericAlias) 

127 

128 def __repr__(self): 

129 return base_tasks._task_repr(self) 

130 

131 def get_coro(self): 

132 return self._coro 

133 

134 def get_context(self): 

135 return self._context 

136 

137 def get_name(self): 

138 return self._name 

139 

140 def set_name(self, value): 

141 self._name = str(value) 

142 

143 def set_result(self, result): 

144 raise RuntimeError('Task does not support set_result operation') 

145 

146 def set_exception(self, exception): 

147 raise RuntimeError('Task does not support set_exception operation') 

148 

149 def get_stack(self, *, limit=None): 

150 """Return the list of stack frames for this task's coroutine. 

151 

152 If the coroutine is not done, this returns the stack where it is 

153 suspended. If the coroutine has completed successfully or was 

154 cancelled, this returns an empty list. If the coroutine was 

155 terminated by an exception, this returns the list of traceback 

156 frames. 

157 

158 The frames are always ordered from oldest to newest. 

159 

160 The optional limit gives the maximum number of frames to 

161 return; by default all available frames are returned. Its 

162 meaning differs depending on whether a stack or a traceback is 

163 returned: the newest frames of a stack are returned, but the 

164 oldest frames of a traceback are returned. (This matches the 

165 behavior of the traceback module.) 

166 

167 For reasons beyond our control, only one stack frame is 

168 returned for a suspended coroutine. 

169 """ 

170 return base_tasks._task_get_stack(self, limit) 

171 

172 def print_stack(self, *, limit=None, file=None): 

173 """Print the stack or traceback for this task's coroutine. 

174 

175 This produces output similar to that of the traceback module, 

176 for the frames retrieved by get_stack(). The limit argument 

177 is passed to get_stack(). The file argument is an I/O stream 

178 to which the output is written; by default output is written 

179 to sys.stderr. 

180 """ 

181 return base_tasks._task_print_stack(self, limit, file) 

182 

183 def cancel(self, msg=None): 

184 """Request that this task cancel itself. 

185 

186 This arranges for a CancelledError to be thrown into the 

187 wrapped coroutine on the next cycle through the event loop. 

188 The coroutine then has a chance to clean up or even deny 

189 the request using try/except/finally. 

190 

191 Unlike Future.cancel, this does not guarantee that the 

192 task will be cancelled: the exception might be caught and 

193 acted upon, delaying cancellation of the task or preventing 

194 cancellation completely. The task may also return a value or 

195 raise a different exception. 

196 

197 Immediately after this method is called, Task.cancelled() will 

198 not return True (unless the task was already cancelled). A 

199 task will be marked as cancelled when the wrapped coroutine 

200 terminates with a CancelledError exception (even if cancel() 

201 was not called). 

202 

203 This also increases the task's count of cancellation requests. 

204 """ 

205 self._log_traceback = False 

206 if self.done(): 

207 return False 

208 self._num_cancels_requested += 1 

209 # These two lines are controversial. See discussion starting at 

210 # https://github.com/python/cpython/pull/31394#issuecomment-1053545331 

211 # Also remember that this is duplicated in _asynciomodule.c. 

212 # if self._num_cancels_requested > 1: 

213 # return False 

214 if self._fut_waiter is not None: 

215 if self._fut_waiter.cancel(msg=msg): 

216 # Leave self._fut_waiter; it may be a Task that 

217 # catches and ignores the cancellation so we may have 

218 # to cancel it again later. 

219 return True 

220 # It must be the case that self.__step is already scheduled. 

221 self._must_cancel = True 

222 self._cancel_message = msg 

223 return True 

224 

225 def cancelling(self): 

226 """Return the count of the task's cancellation requests. 

227 

228 This count is incremented when .cancel() is called 

229 and may be decremented using .uncancel(). 

230 """ 

231 return self._num_cancels_requested 

232 

233 def uncancel(self): 

234 """Decrement the task's count of cancellation requests. 

235 

236 This should be called by the party that called `cancel()` on the task 

237 beforehand. 

238 

239 Returns the remaining number of cancellation requests. 

240 """ 

241 if self._num_cancels_requested > 0: 241 ↛ 245line 241 didn't jump to line 245 because the condition on line 241 was always true

242 self._num_cancels_requested -= 1 

243 if self._num_cancels_requested == 0: 243 ↛ 245line 243 didn't jump to line 245 because the condition on line 243 was always true

244 self._must_cancel = False 

245 return self._num_cancels_requested 

246 

247 def __eager_start(self): 

248 prev_task = _py_swap_current_task(self._loop, self) 

249 try: 

250 _py_register_eager_task(self) 

251 try: 

252 self._context.run(self.__step_run_and_handle_result, None) 

253 finally: 

254 _py_unregister_eager_task(self) 

255 finally: 

256 try: 

257 curtask = _py_swap_current_task(self._loop, prev_task) 

258 assert curtask is self 

259 finally: 

260 if self.done(): 

261 self._coro = None 

262 self = None # Needed to break cycles when an exception occurs. 

263 else: 

264 _py_register_task(self) 

265 

266 def __step(self, exc=None): 

267 if self.done(): 

268 raise exceptions.InvalidStateError( 

269 f'__step(): already done: {self!r}, {exc!r}') 

270 if self._must_cancel: 

271 if not isinstance(exc, exceptions.CancelledError): 

272 exc = self._make_cancelled_error() 

273 self._must_cancel = False 

274 self._fut_waiter = None 

275 

276 _py_enter_task(self._loop, self) 

277 try: 

278 self.__step_run_and_handle_result(exc) 

279 finally: 

280 _py_leave_task(self._loop, self) 

281 self = None # Needed to break cycles when an exception occurs. 

282 

283 def __step_run_and_handle_result(self, exc): 

284 coro = self._coro 

285 try: 

286 if exc is None: 

287 # We use the `send` method directly, because coroutines 

288 # don't have `__iter__` and `__next__` methods. 

289 result = coro.send(None) 

290 else: 

291 result = coro.throw(exc) 

292 except StopIteration as exc: 

293 if self._must_cancel: 

294 # Task is cancelled right before coro stops. 

295 self._must_cancel = False 

296 super().cancel(msg=self._cancel_message) 

297 else: 

298 super().set_result(exc.value) 

299 except exceptions.CancelledError as exc: 

300 # Save the original exception so we can chain it later. 

301 self._cancelled_exc = exc 

302 super().cancel() # I.e., Future.cancel(self). 

303 except (KeyboardInterrupt, SystemExit) as exc: 

304 super().set_exception(exc) 

305 raise 

306 except BaseException as exc: 

307 super().set_exception(exc) 

308 else: 

309 blocking = getattr(result, '_asyncio_future_blocking', None) 

310 if blocking is not None: 

311 # Yielded Future must come from Future.__iter__(). 

312 if futures._get_loop(result) is not self._loop: 

313 new_exc = RuntimeError( 

314 f'Task {self!r} got Future ' 

315 f'{result!r} attached to a different loop') 

316 self._loop.call_soon( 

317 self.__step, new_exc, context=self._context) 

318 elif blocking: 318 ↛ 335line 318 didn't jump to line 335 because the condition on line 318 was always true

319 if result is self: 

320 new_exc = RuntimeError( 

321 f'Task cannot await on itself: {self!r}') 

322 self._loop.call_soon( 

323 self.__step, new_exc, context=self._context) 

324 else: 

325 futures.future_add_to_awaited_by(result, self) 

326 result._asyncio_future_blocking = False 

327 result.add_done_callback( 

328 self.__wakeup, context=self._context) 

329 self._fut_waiter = result 

330 if self._must_cancel: 

331 if self._fut_waiter.cancel( 331 ↛ 357line 331 didn't jump to line 357 because the condition on line 331 was always true

332 msg=self._cancel_message): 

333 self._must_cancel = False 

334 else: 

335 new_exc = RuntimeError( 

336 f'yield was used instead of yield from ' 

337 f'in task {self!r} with {result!r}') 

338 self._loop.call_soon( 

339 self.__step, new_exc, context=self._context) 

340 

341 elif result is None: 341 ↛ 344line 341 didn't jump to line 344 because the condition on line 341 was always true

342 # Bare yield relinquishes control for one event loop iteration. 

343 self._loop.call_soon(self.__step, context=self._context) 

344 elif inspect.isgenerator(result): 

345 # Yielding a generator is just wrong. 

346 new_exc = RuntimeError( 

347 f'yield was used instead of yield from for ' 

348 f'generator in task {self!r} with {result!r}') 

349 self._loop.call_soon( 

350 self.__step, new_exc, context=self._context) 

351 else: 

352 # Yielding something else is an error. 

353 new_exc = RuntimeError(f'Task got bad yield: {result!r}') 

354 self._loop.call_soon( 

355 self.__step, new_exc, context=self._context) 

356 finally: 

357 self = None # Needed to break cycles when an exception occurs. 

358 

359 def __wakeup(self, future): 

360 futures.future_discard_from_awaited_by(future, self) 

361 try: 

362 future.result() 

363 except BaseException as exc: 

364 # This may also be a cancellation. 

365 self.__step(exc) 

366 else: 

367 # Don't pass the value of `future.result()` explicitly, 

368 # as `Future.__iter__` and `Future.__await__` don't need it. 

369 # If we call `__step(value, None)` instead of `__step()`, 

370 # Python eval loop would use `.send(value)` method call, 

371 # instead of `__next__()`, which is slower for futures 

372 # that return non-generator iterators from their `__iter__`. 

373 self.__step() 

374 self = None # Needed to break cycles when an exception occurs. 

375 

376 

377_PyTask = Task 

378 

379 

380try: 

381 import _asyncio 

382except ImportError: 

383 pass 

384else: 

385 # _CTask is needed for tests. 

386 Task = _CTask = _asyncio.Task 

387 

388 

389def create_task(coro, **kwargs): 

390 """Schedule the execution of a coroutine object in a spawn task. 

391 

392 Return a Task object. 

393 """ 

394 loop = events.get_running_loop() 

395 return loop.create_task(coro, **kwargs) 

396 

397 

398# wait() and as_completed() similar to those in PEP 3148. 

399 

400FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED 

401FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION 

402ALL_COMPLETED = concurrent.futures.ALL_COMPLETED 

403 

404 

405async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED): 

406 """Wait for the Futures or Tasks given by fs to complete. 

407 

408 The fs iterable must not be empty. 

409 

410 Returns two sets of Future: (done, pending). 

411 

412 Usage: 

413 

414 done, pending = await asyncio.wait(fs) 

415 

416 Note: This does not raise TimeoutError! Futures that aren't done 

417 when the timeout occurs are returned in the second set. 

418 """ 

419 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 

420 raise TypeError(f"expect a list of futures, not {type(fs).__name__}") 

421 if not fs: 

422 raise ValueError('Set of Tasks/Futures is empty.') 

423 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): 

424 raise ValueError(f'Invalid return_when value: {return_when}') 

425 

426 fs = set(fs) 

427 

428 if any(coroutines.iscoroutine(f) for f in fs): 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true

429 raise TypeError("Passing coroutines is forbidden, use tasks explicitly.") 

430 

431 loop = events.get_running_loop() 

432 return await _wait(fs, timeout, return_when, loop) 

433 

434 

435def _release_waiter(waiter, *args): 

436 if not waiter.done(): 436 ↛ exitline 436 didn't return from function '_release_waiter' because the condition on line 436 was always true

437 waiter.set_result(None) 

438 

439 

440async def wait_for(fut, timeout): 

441 """Wait for the single Future or coroutine to complete, with timeout. 

442 

443 Coroutine will be wrapped in Task. 

444 

445 Returns result of the Future or coroutine. When a timeout occurs, 

446 it cancels the task and raises TimeoutError. To avoid the task 

447 cancellation, wrap it in shield(). 

448 

449 If the wait is cancelled, the task is also cancelled. 

450 

451 If the task suppresses the cancellation and returns a value instead, 

452 that value is returned. 

453 

454 This function is a coroutine. 

455 """ 

456 # The special case for timeout <= 0 is for the following case: 

457 # 

458 # async def test_waitfor(): 

459 # func_started = False 

460 # 

461 # async def func(): 

462 # nonlocal func_started 

463 # func_started = True 

464 # 

465 # try: 

466 # await asyncio.wait_for(func(), 0) 

467 # except asyncio.TimeoutError: 

468 # assert not func_started 

469 # else: 

470 # assert False 

471 # 

472 # asyncio.run(test_waitfor()) 

473 

474 

475 if timeout is not None and timeout <= 0: 

476 fut = ensure_future(fut) 

477 

478 if fut.done(): 

479 return fut.result() 

480 

481 await _cancel_and_wait(fut) 

482 try: 

483 return fut.result() 

484 except exceptions.CancelledError as exc: 

485 raise TimeoutError from exc 

486 

487 async with timeouts.timeout(timeout): 

488 return await fut 

489 

490async def _wait(fs, timeout, return_when, loop): 

491 """Internal helper for wait(). 

492 

493 The fs argument must be a collection of Futures. 

494 """ 

495 assert fs, 'Set of Futures is empty.' 

496 waiter = loop.create_future() 

497 timeout_handle = None 

498 if timeout is not None: 

499 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 

500 counter = len(fs) 

501 cur_task = current_task() 

502 

503 def _on_completion(f): 

504 nonlocal counter 

505 counter -= 1 

506 if (counter <= 0 or 

507 return_when == FIRST_COMPLETED or 

508 return_when == FIRST_EXCEPTION and (not f.cancelled() and 

509 f.exception() is not None)): 

510 if timeout_handle is not None: 510 ↛ 511line 510 didn't jump to line 511 because the condition on line 510 was never true

511 timeout_handle.cancel() 

512 if not waiter.done(): 

513 waiter.set_result(None) 

514 futures.future_discard_from_awaited_by(f, cur_task) 

515 

516 for f in fs: 

517 f.add_done_callback(_on_completion) 

518 futures.future_add_to_awaited_by(f, cur_task) 

519 

520 try: 

521 await waiter 

522 finally: 

523 if timeout_handle is not None: 

524 timeout_handle.cancel() 

525 for f in fs: 

526 f.remove_done_callback(_on_completion) 

527 

528 done, pending = set(), set() 

529 for f in fs: 

530 if f.done(): 

531 done.add(f) 

532 else: 

533 pending.add(f) 

534 return done, pending 

535 

536 

537async def _cancel_and_wait(fut): 

538 """Cancel the *fut* future or task and wait until it completes.""" 

539 

540 loop = events.get_running_loop() 

541 waiter = loop.create_future() 

542 cb = functools.partial(_release_waiter, waiter) 

543 fut.add_done_callback(cb) 

544 

545 try: 

546 fut.cancel() 

547 # We cannot wait on *fut* directly to make 

548 # sure _cancel_and_wait itself is reliably cancellable. 

549 await waiter 

550 finally: 

551 fut.remove_done_callback(cb) 

552 

553 

554class _AsCompletedIterator: 

555 """Iterator of awaitables representing tasks of asyncio.as_completed. 

556 

557 As an asynchronous iterator, iteration yields futures as they finish. As a 

558 plain iterator, new coroutines are yielded that will return or raise the 

559 result of the next underlying future to complete. 

560 """ 

561 def __init__(self, aws, timeout): 

562 self._done = queues.Queue() 

563 self._timeout_handle = None 

564 

565 loop = events.get_event_loop() 

566 todo = {ensure_future(aw, loop=loop) for aw in set(aws)} 

567 for f in todo: 

568 f.add_done_callback(self._handle_completion) 

569 if todo and timeout is not None: 

570 self._timeout_handle = ( 

571 loop.call_later(timeout, self._handle_timeout) 

572 ) 

573 self._todo = todo 

574 self._todo_left = len(todo) 

575 

576 def __aiter__(self): 

577 return self 

578 

579 def __iter__(self): 

580 return self 

581 

582 async def __anext__(self): 

583 if not self._todo_left: 

584 raise StopAsyncIteration 

585 assert self._todo_left > 0 

586 self._todo_left -= 1 

587 return await self._wait_for_one() 

588 

589 def __next__(self): 

590 if not self._todo_left: 

591 raise StopIteration 

592 assert self._todo_left > 0 

593 self._todo_left -= 1 

594 return self._wait_for_one(resolve=True) 

595 

596 def _handle_timeout(self): 

597 for f in self._todo: 

598 f.remove_done_callback(self._handle_completion) 

599 self._done.put_nowait(None) # Sentinel for _wait_for_one(). 

600 self._todo.clear() # Can't do todo.remove(f) in the loop. 

601 

602 def _handle_completion(self, f): 

603 if not self._todo: 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true

604 return # _handle_timeout() was here first. 

605 self._todo.remove(f) 

606 self._done.put_nowait(f) 

607 if not self._todo and self._timeout_handle is not None: 

608 self._timeout_handle.cancel() 

609 

610 async def _wait_for_one(self, resolve=False): 

611 # Wait for the next future to be done and return it unless resolve is 

612 # set, in which case return either the result of the future or raise 

613 # an exception. 

614 f = await self._done.get() 

615 if f is None: 

616 # Dummy value from _handle_timeout(). 

617 raise exceptions.TimeoutError 

618 return f.result() if resolve else f 

619 

620 

621def as_completed(fs, *, timeout=None): 

622 """Create an iterator of awaitables or their results in completion order. 

623 

624 Run the supplied awaitables concurrently. The returned object can be 

625 iterated to obtain the results of the awaitables as they finish. 

626 

627 The object returned can be iterated as an asynchronous iterator or a plain 

628 iterator. When asynchronous iteration is used, the originally-supplied 

629 awaitables are yielded if they are tasks or futures. This makes it easy to 

630 correlate previously-scheduled tasks with their results: 

631 

632 ipv4_connect = create_task(open_connection("127.0.0.1", 80)) 

633 ipv6_connect = create_task(open_connection("::1", 80)) 

634 tasks = [ipv4_connect, ipv6_connect] 

635 

636 async for earliest_connect in as_completed(tasks): 

637 # earliest_connect is done. The result can be obtained by 

638 # awaiting it or calling earliest_connect.result() 

639 reader, writer = await earliest_connect 

640 

641 if earliest_connect is ipv6_connect: 

642 print("IPv6 connection established.") 

643 else: 

644 print("IPv4 connection established.") 

645 

646 During asynchronous iteration, implicitly-created tasks will be yielded for 

647 supplied awaitables that aren't tasks or futures. 

648 

649 When used as a plain iterator, each iteration yields a new coroutine that 

650 returns the result or raises the exception of the next completed awaitable. 

651 This pattern is compatible with Python versions older than 3.13: 

652 

653 ipv4_connect = create_task(open_connection("127.0.0.1", 80)) 

654 ipv6_connect = create_task(open_connection("::1", 80)) 

655 tasks = [ipv4_connect, ipv6_connect] 

656 

657 for next_connect in as_completed(tasks): 

658 # next_connect is not one of the original task objects. It must be 

659 # awaited to obtain the result value or raise the exception of the 

660 # awaitable that finishes next. 

661 reader, writer = await next_connect 

662 

663 A TimeoutError is raised if the timeout occurs before all awaitables are 

664 done. This is raised by the async for loop during asynchronous iteration or 

665 by the coroutines yielded during plain iteration. 

666 """ 

667 if inspect.isawaitable(fs): 

668 raise TypeError( 

669 f"expects an iterable of awaitables, not {type(fs).__name__}" 

670 ) 

671 

672 return _AsCompletedIterator(fs, timeout) 

673 

674 

675@types.coroutine 

676def __sleep0(): 

677 """Skip one event loop run cycle. 

678 

679 This is a private helper for 'asyncio.sleep()', used 

680 when the 'delay' is set to 0. It uses a bare 'yield' 

681 expression (which Task.__step knows how to handle) 

682 instead of creating a Future object. 

683 """ 

684 yield 

685 

686 

687async def sleep(delay, result=None): 

688 """Coroutine that completes after a given time (in seconds).""" 

689 if delay <= 0: 

690 await __sleep0() 

691 return result 

692 

693 if math.isnan(delay): 

694 raise ValueError("Invalid delay: NaN (not a number)") 

695 

696 loop = events.get_running_loop() 

697 future = loop.create_future() 

698 h = loop.call_later(delay, 

699 futures._set_result_unless_cancelled, 

700 future, result) 

701 try: 

702 return await future 

703 finally: 

704 h.cancel() 

705 

706 

707def ensure_future(coro_or_future, *, loop=None): 

708 """Wrap a coroutine or an awaitable in a future. 

709 

710 If the argument is a Future, it is returned directly. 

711 """ 

712 if futures.isfuture(coro_or_future): 

713 if loop is not None and loop is not futures._get_loop(coro_or_future): 

714 raise ValueError('The future belongs to a different loop than ' 

715 'the one specified as the loop argument') 

716 return coro_or_future 

717 should_close = True 

718 if not coroutines.iscoroutine(coro_or_future): 

719 if inspect.isawaitable(coro_or_future): 

720 async def _wrap_awaitable(awaitable): 

721 return await awaitable 

722 

723 coro_or_future = _wrap_awaitable(coro_or_future) 

724 should_close = False 

725 else: 

726 raise TypeError('An asyncio.Future, a coroutine or an awaitable ' 

727 'is required') 

728 

729 if loop is None: 

730 loop = events.get_event_loop() 

731 try: 

732 return loop.create_task(coro_or_future) 

733 except RuntimeError: 

734 if should_close: 734 ↛ 736line 734 didn't jump to line 736 because the condition on line 734 was always true

735 coro_or_future.close() 

736 raise 

737 

738 

739class _GatheringFuture(futures.Future): 

740 """Helper for gather(). 

741 

742 This overrides cancel() to cancel all the children and act more 

743 like Task.cancel(), which doesn't immediately mark itself as 

744 cancelled. 

745 """ 

746 

747 def __init__(self, children, *, loop): 

748 assert loop is not None 

749 super().__init__(loop=loop) 

750 self._children = children 

751 self._cancel_requested = False 

752 

753 def cancel(self, msg=None): 

754 if self.done(): 

755 return False 

756 ret = False 

757 for child in self._children: 

758 if child.cancel(msg=msg): 

759 ret = True 

760 if ret: 

761 # If any child tasks were actually cancelled, we should 

762 # propagate the cancellation request regardless of 

763 # *return_exceptions* argument. See issue 32684. 

764 self._cancel_requested = True 

765 return ret 

766 

767 

768def gather(*coros_or_futures, return_exceptions=False): 

769 """Return a future aggregating results from the given coroutines/futures. 

770 

771 Coroutines will be wrapped in a future and scheduled in the event 

772 loop. They will not necessarily be scheduled in the same order as 

773 passed in. 

774 

775 All futures must share the same event loop. If all the tasks are 

776 done successfully, the returned future's result is the list of 

777 results (in the order of the original sequence, not necessarily 

778 the order of results arrival). If *return_exceptions* is True, 

779 exceptions in the tasks are treated the same as successful 

780 results, and gathered in the result list; otherwise, the first 

781 raised exception will be immediately propagated to the returned 

782 future. 

783 

784 Cancellation: if the outer Future is cancelled, all children (that 

785 have not completed yet) are also cancelled. If any child is 

786 cancelled, this is treated as if it raised CancelledError -- 

787 the outer Future is *not* cancelled in this case. (This is to 

788 prevent the cancellation of one child to cause other children to 

789 be cancelled.) 

790 

791 If *return_exceptions* is False, cancelling gather() after it 

792 has been marked done won't cancel any submitted awaitables. 

793 For instance, gather can be marked done after propagating an 

794 exception to the caller, therefore, calling ``gather.cancel()`` 

795 after catching an exception (raised by one of the awaitables) from 

796 gather won't cancel any other awaitables. 

797 """ 

798 if not coros_or_futures: 

799 loop = events.get_event_loop() 

800 outer = loop.create_future() 

801 outer.set_result([]) 

802 return outer 

803 

804 loop = events._get_running_loop() 

805 if loop is not None: 

806 cur_task = current_task(loop) 

807 else: 

808 cur_task = None 

809 

810 def _done_callback(fut, cur_task=cur_task): 

811 nonlocal nfinished 

812 nfinished += 1 

813 

814 if cur_task is not None: 

815 futures.future_discard_from_awaited_by(fut, cur_task) 

816 

817 if outer is None or outer.done(): 

818 if not fut.cancelled(): 

819 # Mark exception retrieved. 

820 fut.exception() 

821 return 

822 

823 if not return_exceptions: 

824 if fut.cancelled(): 

825 # Check if 'fut' is cancelled first, as 

826 # 'fut.exception()' will *raise* a CancelledError 

827 # instead of returning it. 

828 exc = fut._make_cancelled_error() 

829 outer.set_exception(exc) 

830 return 

831 else: 

832 exc = fut.exception() 

833 if exc is not None: 

834 outer.set_exception(exc) 

835 return 

836 

837 if nfinished == nfuts: 

838 # All futures are done; create a list of results 

839 # and set it to the 'outer' future. 

840 results = [] 

841 

842 for fut in children: 

843 if fut.cancelled(): 

844 # Check if 'fut' is cancelled first, as 'fut.exception()' 

845 # will *raise* a CancelledError instead of returning it. 

846 # Also, since we're adding the exception return value 

847 # to 'results' instead of raising it, don't bother 

848 # setting __context__. This also lets us preserve 

849 # calling '_make_cancelled_error()' at most once. 

850 res = exceptions.CancelledError( 

851 '' if fut._cancel_message is None else 

852 fut._cancel_message) 

853 else: 

854 res = fut.exception() 

855 if res is None: 

856 res = fut.result() 

857 results.append(res) 

858 

859 if outer._cancel_requested: 

860 # If gather is being cancelled we must propagate the 

861 # cancellation regardless of *return_exceptions* argument. 

862 # See issue 32684. 

863 exc = fut._make_cancelled_error() 

864 outer.set_exception(exc) 

865 else: 

866 outer.set_result(results) 

867 

868 arg_to_fut = {} 

869 children = [] 

870 nfuts = 0 

871 nfinished = 0 

872 done_futs = [] 

873 outer = None # bpo-46672 

874 for arg in coros_or_futures: 

875 if arg not in arg_to_fut: 

876 fut = ensure_future(arg, loop=loop) 

877 if loop is None: 

878 loop = futures._get_loop(fut) 

879 if fut is not arg: 

880 # 'arg' was not a Future, therefore, 'fut' is a new 

881 # Future created specifically for 'arg'. Since the caller 

882 # can't control it, disable the "destroy pending task" 

883 # warning. 

884 fut._log_destroy_pending = False 

885 nfuts += 1 

886 arg_to_fut[arg] = fut 

887 if fut.done(): 

888 done_futs.append(fut) 

889 else: 

890 if cur_task is not None: 

891 futures.future_add_to_awaited_by(fut, cur_task) 

892 fut.add_done_callback(_done_callback) 

893 

894 else: 

895 # There's a duplicate Future object in coros_or_futures. 

896 fut = arg_to_fut[arg] 

897 

898 children.append(fut) 

899 

900 outer = _GatheringFuture(children, loop=loop) 

901 # Run done callbacks after GatheringFuture created so any post-processing 

902 # can be performed at this point 

903 # optimization: in the special case that *all* futures finished eagerly, 

904 # this will effectively complete the gather eagerly, with the last 

905 # callback setting the result (or exception) on outer before returning it 

906 for fut in done_futs: 

907 _done_callback(fut) 

908 return outer 

909 

910 

911def _log_on_exception(fut): 

912 if fut.cancelled(): 912 ↛ 913line 912 didn't jump to line 913 because the condition on line 912 was never true

913 return 

914 

915 exc = fut.exception() 

916 if exc is None: 

917 return 

918 

919 context = { 

920 'message': 

921 f'{exc.__class__.__name__} exception in shielded future', 

922 'exception': exc, 

923 'future': fut, 

924 } 

925 if fut._source_traceback: 925 ↛ 926line 925 didn't jump to line 926 because the condition on line 925 was never true

926 context['source_traceback'] = fut._source_traceback 

927 fut._loop.call_exception_handler(context) 

928 

929 

930def shield(arg): 

931 """Wait for a future, shielding it from cancellation. 

932 

933 The statement 

934 

935 task = asyncio.create_task(something()) 

936 res = await shield(task) 

937 

938 is exactly equivalent to the statement 

939 

940 res = await something() 

941 

942 *except* that if the coroutine containing it is cancelled, the 

943 task running in something() is not cancelled. From the POV of 

944 something(), the cancellation did not happen. But its caller is 

945 still cancelled, so the yield-from expression still raises 

946 CancelledError. Note: If something() is cancelled by other means 

947 this will still cancel shield(). 

948 

949 If you want to completely ignore cancellation (not recommended) 

950 you can combine shield() with a try/except clause, as follows: 

951 

952 task = asyncio.create_task(something()) 

953 try: 

954 res = await shield(task) 

955 except CancelledError: 

956 res = None 

957 

958 Save a reference to tasks passed to this function, to avoid 

959 a task disappearing mid-execution. The event loop only keeps 

960 weak references to tasks. A task that isn't referenced elsewhere 

961 may get garbage collected at any time, even before it's done. 

962 """ 

963 inner = ensure_future(arg) 

964 if inner.done(): 

965 # Shortcut. 

966 return inner 

967 loop = futures._get_loop(inner) 

968 outer = loop.create_future() 

969 

970 if loop is not None and (cur_task := current_task(loop)) is not None: 

971 futures.future_add_to_awaited_by(inner, cur_task) 

972 else: 

973 cur_task = None 

974 

975 def _clear_awaited_by_callback(inner): 

976 futures.future_discard_from_awaited_by(inner, cur_task) 

977 

978 def _inner_done_callback(inner): 

979 if outer.cancelled(): 979 ↛ 980line 979 didn't jump to line 980 because the condition on line 979 was never true

980 return 

981 

982 if inner.cancelled(): 

983 outer.cancel() 

984 else: 

985 exc = inner.exception() 

986 if exc is not None: 

987 outer.set_exception(exc) 

988 else: 

989 outer.set_result(inner.result()) 

990 

991 def _outer_done_callback(outer): 

992 if not inner.done(): 

993 inner.remove_done_callback(_inner_done_callback) 

994 # Keep only one callback to log on cancel 

995 inner.remove_done_callback(_log_on_exception) 

996 inner.add_done_callback(_log_on_exception) 

997 

998 if cur_task is not None: 

999 inner.add_done_callback(_clear_awaited_by_callback) 

1000 

1001 

1002 inner.add_done_callback(_inner_done_callback) 

1003 outer.add_done_callback(_outer_done_callback) 

1004 return outer 

1005 

1006 

1007def run_coroutine_threadsafe(coro, loop): 

1008 """Submit a coroutine object to a given event loop. 

1009 

1010 Return a concurrent.futures.Future to access the result. 

1011 """ 

1012 if not coroutines.iscoroutine(coro): 1012 ↛ 1013line 1012 didn't jump to line 1013 because the condition on line 1012 was never true

1013 raise TypeError('A coroutine object is required') 

1014 future = concurrent.futures.Future() 

1015 

1016 def callback(): 

1017 try: 

1018 futures._chain_future(ensure_future(coro, loop=loop), future) 

1019 except (SystemExit, KeyboardInterrupt): 

1020 raise 

1021 except BaseException as exc: 

1022 if future.set_running_or_notify_cancel(): 1022 ↛ 1024line 1022 didn't jump to line 1024 because the condition on line 1022 was always true

1023 future.set_exception(exc) 

1024 raise 

1025 

1026 loop.call_soon_threadsafe(callback) 

1027 return future 

1028 

1029 

1030def create_eager_task_factory(custom_task_constructor): 

1031 """Create a function suitable for use as a task factory on an event-loop. 

1032 

1033 Example usage: 

1034 

1035 loop.set_task_factory( 

1036 asyncio.create_eager_task_factory(my_task_constructor)) 

1037 

1038 Now, tasks created will be started immediately (rather than being first 

1039 scheduled to an event loop). The constructor argument can be any callable 

1040 that returns a Task-compatible object and has a signature compatible 

1041 with `Task.__init__`; it must have the `eager_start` keyword argument. 

1042 

1043 Most applications will use `Task` for `custom_task_constructor` and in 

1044 this case there's no need to call `create_eager_task_factory()` 

1045 directly. Instead the global `eager_task_factory` instance can be 

1046 used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`. 

1047 """ 

1048 

1049 def factory(loop, coro, *, eager_start=True, **kwargs): 

1050 return custom_task_constructor( 

1051 coro, loop=loop, eager_start=eager_start, **kwargs) 

1052 

1053 return factory 

1054 

1055 

1056eager_task_factory = create_eager_task_factory(Task) 

1057 

1058 

1059# Collectively these two sets hold references to the complete set of active 

1060# tasks. Eagerly executed tasks use a faster regular set as an optimization 

1061# but may graduate to a WeakSet if the task blocks on IO. 

1062_scheduled_tasks = weakref.WeakSet() 

1063_eager_tasks = set() 

1064 

1065# Dictionary containing tasks that are currently active in 

1066# all running event loops. {EventLoop: Task} 

1067_current_tasks = {} 

1068 

1069 

1070def _register_task(task): 

1071 """Register an asyncio Task scheduled to run on an event loop.""" 

1072 _scheduled_tasks.add(task) 

1073 

1074 

1075def _register_eager_task(task): 

1076 """Register an asyncio Task about to be eagerly executed.""" 

1077 _eager_tasks.add(task) 

1078 

1079 

1080def _enter_task(loop, task): 

1081 current_task = _current_tasks.get(loop) 

1082 if current_task is not None: 

1083 raise RuntimeError(f"Cannot enter into task {task!r} while another " 

1084 f"task {current_task!r} is being executed.") 

1085 _current_tasks[loop] = task 

1086 

1087 

1088def _leave_task(loop, task): 

1089 current_task = _current_tasks.get(loop) 

1090 if current_task is not task: 

1091 raise RuntimeError(f"Leaving task {task!r} does not match " 

1092 f"the current task {current_task!r}.") 

1093 del _current_tasks[loop] 

1094 

1095 

1096def _swap_current_task(loop, task): 

1097 prev_task = _current_tasks.get(loop) 

1098 if task is None: 

1099 del _current_tasks[loop] 

1100 else: 

1101 _current_tasks[loop] = task 

1102 return prev_task 

1103 

1104 

1105def _unregister_task(task): 

1106 """Unregister a completed, scheduled Task.""" 

1107 _scheduled_tasks.discard(task) 

1108 

1109 

1110def _unregister_eager_task(task): 

1111 """Unregister a task which finished its first eager step.""" 

1112 _eager_tasks.discard(task) 

1113 

1114 

1115_py_current_task = current_task 

1116_py_register_task = _register_task 

1117_py_register_eager_task = _register_eager_task 

1118_py_unregister_task = _unregister_task 

1119_py_unregister_eager_task = _unregister_eager_task 

1120_py_enter_task = _enter_task 

1121_py_leave_task = _leave_task 

1122_py_swap_current_task = _swap_current_task 

1123_py_all_tasks = all_tasks 

1124 

1125try: 

1126 from _asyncio import (_register_task, _register_eager_task, 

1127 _unregister_task, _unregister_eager_task, 

1128 _enter_task, _leave_task, _swap_current_task, 

1129 current_task, all_tasks) 

1130except ImportError: 

1131 pass 

1132else: 

1133 _c_current_task = current_task 

1134 _c_register_task = _register_task 

1135 _c_register_eager_task = _register_eager_task 

1136 _c_unregister_task = _unregister_task 

1137 _c_unregister_eager_task = _unregister_eager_task 

1138 _c_enter_task = _enter_task 

1139 _c_leave_task = _leave_task 

1140 _c_swap_current_task = _swap_current_task 

1141 _c_all_tasks = all_tasks