Coverage for Lib/asyncio/tasks.py: 95%

521 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-15 02:02 +0000

1"""Support for tasks, coroutines and the scheduler.""" 

2 

3__all__ = ( 

4 'Task', 'create_task', 

5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 

6 'wait', 'wait_for', 'as_completed', 'sleep', 

7 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 

8 'current_task', 'all_tasks', 

9 'create_eager_task_factory', 'eager_task_factory', 

10 '_register_task', '_unregister_task', '_enter_task', '_leave_task', 

11) 

12 

13import concurrent.futures 

14import contextvars 

15import functools 

16import inspect 

17import itertools 

18import math 

19import types 

20import weakref 

21from types import GenericAlias 

22 

23from . import base_tasks 

24from . import coroutines 

25from . import events 

26from . import exceptions 

27from . import futures 

28from . import queues 

29from . import timeouts 

30 

31# Helper to generate new task names 

32# This uses itertools.count() instead of a "+= 1" operation because the latter 

33# is not thread safe. See bpo-11866 for a longer explanation. 

34_task_name_counter = itertools.count(1).__next__ 

35 

36 

37def current_task(loop=None): 

38 """Return a currently executed task.""" 

39 if loop is None: 

40 loop = events.get_running_loop() 

41 return _current_tasks.get(loop) 

42 

43 

44def all_tasks(loop=None): 

45 """Return a set of all tasks for the loop.""" 

46 if loop is None: 

47 loop = events.get_running_loop() 

48 # capturing the set of eager tasks first, so if an eager task "graduates" 

49 # to a regular task in another thread, we don't risk missing it. 

50 eager_tasks = list(_eager_tasks) 

51 

52 return {t for t in itertools.chain(_scheduled_tasks, eager_tasks) 

53 if futures._get_loop(t) is loop and not t.done()} 

54 

55 

56class Task(futures._PyFuture): # Inherit Python Task implementation 

57 # from a Python Future implementation. 

58 

59 """A coroutine wrapped in a Future.""" 

60 

61 # An important invariant maintained while a Task not done: 

62 # _fut_waiter is either None or a Future. The Future 

63 # can be either done() or not done(). 

64 # The task can be in any of 3 states: 

65 # 

66 # - 1: _fut_waiter is not None and not _fut_waiter.done(): 

67 # __step() is *not* scheduled and the Task is waiting for _fut_waiter. 

68 # - 2: (_fut_waiter is None or _fut_waiter.done()) and __step() is scheduled: 

69 # the Task is waiting for __step() to be executed. 

70 # - 3: _fut_waiter is None and __step() is *not* scheduled: 

71 # the Task is currently executing (in __step()). 

72 # 

73 # * In state 1, one of the callbacks of __fut_waiter must be __wakeup(). 

74 # * The transition from 1 to 2 happens when _fut_waiter becomes done(), 

75 # as it schedules __wakeup() to be called (which calls __step() so 

76 # we way that __step() is scheduled). 

77 # * It transitions from 2 to 3 when __step() is executed, and it clears 

78 # _fut_waiter to None. 

79 

80 # If False, don't log a message if the task is destroyed while its 

81 # status is still pending 

82 _log_destroy_pending = True 

83 

84 def __init__(self, coro, *, loop=None, name=None, context=None, 

85 eager_start=False): 

86 super().__init__(loop=loop) 

87 if self._source_traceback: 

88 del self._source_traceback[-1] 

89 if not coroutines.iscoroutine(coro): 

90 # raise after Future.__init__(), attrs are required for __del__ 

91 # prevent logging for pending task in __del__ 

92 self._log_destroy_pending = False 

93 raise TypeError(f"a coroutine was expected, got {coro!r}") 

94 

95 if name is None: 

96 self._name = f'Task-{_task_name_counter()}' 

97 else: 

98 self._name = str(name) 

99 

100 self._num_cancels_requested = 0 

101 self._must_cancel = False 

102 self._fut_waiter = None 

103 self._coro = coro 

104 if context is None: 

105 self._context = contextvars.copy_context() 

106 else: 

107 self._context = context 

108 

109 if eager_start and self._loop.is_running(): 

110 self.__eager_start() 

111 else: 

112 self._loop.call_soon(self.__step, context=self._context) 

113 _py_register_task(self) 

114 

115 def __del__(self): 

116 if self._state == futures._PENDING and self._log_destroy_pending: 

117 context = { 

118 'task': self, 

119 'message': 'Task was destroyed but it is pending!', 

120 } 

121 if self._source_traceback: 

122 context['source_traceback'] = self._source_traceback 

123 self._loop.call_exception_handler(context) 

124 super().__del__() 

125 

126 __class_getitem__ = classmethod(GenericAlias) 

127 

128 def __repr__(self): 

129 return base_tasks._task_repr(self) 

130 

131 def get_coro(self): 

132 return self._coro 

133 

134 def get_context(self): 

135 return self._context 

136 

137 def get_name(self): 

138 return self._name 

139 

140 def set_name(self, value): 

141 self._name = str(value) 

142 

143 def set_result(self, result): 

144 raise RuntimeError('Task does not support set_result operation') 

145 

146 def set_exception(self, exception): 

147 raise RuntimeError('Task does not support set_exception operation') 

148 

149 def get_stack(self, *, limit=None): 

150 """Return the list of stack frames for this task's coroutine. 

151 

152 If the coroutine is not done, this returns the stack where it is 

153 suspended. If the coroutine has completed successfully or was 

154 cancelled, this returns an empty list. If the coroutine was 

155 terminated by an exception, this returns the list of traceback 

156 frames. 

157 

158 The frames are always ordered from oldest to newest. 

159 

160 The optional limit gives the maximum number of frames to 

161 return; by default all available frames are returned. Its 

162 meaning differs depending on whether a stack or a traceback is 

163 returned: the newest frames of a stack are returned, but the 

164 oldest frames of a traceback are returned. (This matches the 

165 behavior of the traceback module.) 

166 

167 For reasons beyond our control, only one stack frame is 

168 returned for a suspended coroutine. 

169 """ 

170 return base_tasks._task_get_stack(self, limit) 

171 

172 def print_stack(self, *, limit=None, file=None): 

173 """Print the stack or traceback for this task's coroutine. 

174 

175 This produces output similar to that of the traceback module, 

176 for the frames retrieved by get_stack(). The limit argument 

177 is passed to get_stack(). The file argument is an I/O stream 

178 to which the output is written; by default output is written 

179 to sys.stderr. 

180 """ 

181 return base_tasks._task_print_stack(self, limit, file) 

182 

183 def cancel(self, msg=None): 

184 """Request that this task cancel itself. 

185 

186 This arranges for a CancelledError to be thrown into the 

187 wrapped coroutine on the next cycle through the event loop. 

188 The coroutine then has a chance to clean up or even deny 

189 the request using try/except/finally. 

190 

191 Unlike Future.cancel, this does not guarantee that the 

192 task will be cancelled: the exception might be caught and 

193 acted upon, delaying cancellation of the task or preventing 

194 cancellation completely. The task may also return a value or 

195 raise a different exception. 

196 

197 Immediately after this method is called, Task.cancelled() will 

198 not return True (unless the task was already cancelled). A 

199 task will be marked as cancelled when the wrapped coroutine 

200 terminates with a CancelledError exception (even if cancel() 

201 was not called). 

202 

203 This also increases the task's count of cancellation requests. 

204 """ 

205 self._log_traceback = False 

206 if self.done(): 

207 return False 

208 self._num_cancels_requested += 1 

209 # These two lines are controversial. See discussion starting at 

210 # https://github.com/python/cpython/pull/31394#issuecomment-1053545331 

211 # Also remember that this is duplicated in _asynciomodule.c. 

212 # if self._num_cancels_requested > 1: 

213 # return False 

214 if self._fut_waiter is not None: 

215 if self._fut_waiter.cancel(msg=msg): 

216 # Leave self._fut_waiter; it may be a Task that 

217 # catches and ignores the cancellation so we may have 

218 # to cancel it again later. 

219 return True 

220 # It must be the case that self.__step is already scheduled. 

221 self._must_cancel = True 

222 self._cancel_message = msg 

223 return True 

224 

225 def cancelling(self): 

226 """Return the count of the task's cancellation requests. 

227 

228 This count is incremented when .cancel() is called 

229 and may be decremented using .uncancel(). 

230 """ 

231 return self._num_cancels_requested 

232 

233 def uncancel(self): 

234 """Decrement the task's count of cancellation requests. 

235 

236 This should be called by the party that called `cancel()` on the task 

237 beforehand. 

238 

239 Returns the remaining number of cancellation requests. 

240 """ 

241 if self._num_cancels_requested > 0: 241 ↛ 245line 241 didn't jump to line 245 because the condition on line 241 was always true

242 self._num_cancels_requested -= 1 

243 if self._num_cancels_requested == 0: 243 ↛ 245line 243 didn't jump to line 245 because the condition on line 243 was always true

244 self._must_cancel = False 

245 return self._num_cancels_requested 

246 

247 def __eager_start(self): 

248 prev_task = _py_swap_current_task(self._loop, self) 

249 try: 

250 _py_register_eager_task(self) 

251 try: 

252 self._context.run(self.__step_run_and_handle_result, None) 

253 finally: 

254 _py_unregister_eager_task(self) 

255 finally: 

256 try: 

257 curtask = _py_swap_current_task(self._loop, prev_task) 

258 assert curtask is self 

259 finally: 

260 if self.done(): 

261 self._coro = None 

262 self = None # Needed to break cycles when an exception occurs. 

263 else: 

264 _py_register_task(self) 

265 

266 def __step(self, exc=None): 

267 if self.done(): 

268 raise exceptions.InvalidStateError( 

269 f'__step(): already done: {self!r}, {exc!r}') 

270 if self._must_cancel: 

271 if not isinstance(exc, exceptions.CancelledError): 

272 exc = self._make_cancelled_error() 

273 self._must_cancel = False 

274 self._fut_waiter = None 

275 

276 _py_enter_task(self._loop, self) 

277 try: 

278 self.__step_run_and_handle_result(exc) 

279 finally: 

280 _py_leave_task(self._loop, self) 

281 self = None # Needed to break cycles when an exception occurs. 

282 

283 def __step_run_and_handle_result(self, exc): 

284 coro = self._coro 

285 try: 

286 if exc is None: 

287 # We use the `send` method directly, because coroutines 

288 # don't have `__iter__` and `__next__` methods. 

289 result = coro.send(None) 

290 else: 

291 result = coro.throw(exc) 

292 except StopIteration as exc: 

293 if self._must_cancel: 

294 # Task is cancelled right before coro stops. 

295 self._must_cancel = False 

296 super().cancel(msg=self._cancel_message) 

297 else: 

298 super().set_result(exc.value) 

299 except exceptions.CancelledError as exc: 

300 # Save the original exception so we can chain it later. 

301 self._cancelled_exc = exc 

302 super().cancel() # I.e., Future.cancel(self). 

303 except (KeyboardInterrupt, SystemExit) as exc: 

304 super().set_exception(exc) 

305 raise 

306 except BaseException as exc: 

307 super().set_exception(exc) 

308 else: 

309 blocking = getattr(result, '_asyncio_future_blocking', None) 

310 if blocking is not None: 

311 # Yielded Future must come from Future.__iter__(). 

312 if futures._get_loop(result) is not self._loop: 

313 new_exc = RuntimeError( 

314 f'Task {self!r} got Future ' 

315 f'{result!r} attached to a different loop') 

316 self._loop.call_soon( 

317 self.__step, new_exc, context=self._context) 

318 elif blocking: 318 ↛ 335line 318 didn't jump to line 335 because the condition on line 318 was always true

319 if result is self: 

320 new_exc = RuntimeError( 

321 f'Task cannot await on itself: {self!r}') 

322 self._loop.call_soon( 

323 self.__step, new_exc, context=self._context) 

324 else: 

325 futures.future_add_to_awaited_by(result, self) 

326 result._asyncio_future_blocking = False 

327 result.add_done_callback( 

328 self.__wakeup, context=self._context) 

329 self._fut_waiter = result 

330 if self._must_cancel: 

331 if self._fut_waiter.cancel( 331 ↛ 357line 331 didn't jump to line 357 because the condition on line 331 was always true

332 msg=self._cancel_message): 

333 self._must_cancel = False 

334 else: 

335 new_exc = RuntimeError( 

336 f'yield was used instead of yield from ' 

337 f'in task {self!r} with {result!r}') 

338 self._loop.call_soon( 

339 self.__step, new_exc, context=self._context) 

340 

341 elif result is None: 341 ↛ 344line 341 didn't jump to line 344 because the condition on line 341 was always true

342 # Bare yield relinquishes control for one event loop iteration. 

343 self._loop.call_soon(self.__step, context=self._context) 

344 elif inspect.isgenerator(result): 

345 # Yielding a generator is just wrong. 

346 new_exc = RuntimeError( 

347 f'yield was used instead of yield from for ' 

348 f'generator in task {self!r} with {result!r}') 

349 self._loop.call_soon( 

350 self.__step, new_exc, context=self._context) 

351 else: 

352 # Yielding something else is an error. 

353 new_exc = RuntimeError(f'Task got bad yield: {result!r}') 

354 self._loop.call_soon( 

355 self.__step, new_exc, context=self._context) 

356 finally: 

357 self = None # Needed to break cycles when an exception occurs. 

358 

359 def __wakeup(self, future): 

360 futures.future_discard_from_awaited_by(future, self) 

361 try: 

362 future.result() 

363 except BaseException as exc: 

364 # This may also be a cancellation. 

365 self.__step(exc) 

366 else: 

367 # Don't pass the value of `future.result()` explicitly, 

368 # as `Future.__iter__` and `Future.__await__` don't need it. 

369 # If we call `__step(value, None)` instead of `__step()`, 

370 # Python eval loop would use `.send(value)` method call, 

371 # instead of `__next__()`, which is slower for futures 

372 # that return non-generator iterators from their `__iter__`. 

373 self.__step() 

374 self = None # Needed to break cycles when an exception occurs. 

375 

376 

377_PyTask = Task 

378 

379 

380try: 

381 import _asyncio 

382except ImportError: 

383 pass 

384else: 

385 # _CTask is needed for tests. 

386 Task = _CTask = _asyncio.Task 

387 

388 

389def create_task(coro, *, name=None, context=None): 

390 """Schedule the execution of a coroutine object in a spawn task. 

391 

392 Return a Task object. 

393 """ 

394 loop = events.get_running_loop() 

395 if context is None: 

396 # Use legacy API if context is not needed 

397 task = loop.create_task(coro, name=name) 

398 else: 

399 task = loop.create_task(coro, name=name, context=context) 

400 

401 return task 

402 

403 

404# wait() and as_completed() similar to those in PEP 3148. 

405 

406FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED 

407FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION 

408ALL_COMPLETED = concurrent.futures.ALL_COMPLETED 

409 

410 

411async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED): 

412 """Wait for the Futures or Tasks given by fs to complete. 

413 

414 The fs iterable must not be empty. 

415 

416 Returns two sets of Future: (done, pending). 

417 

418 Usage: 

419 

420 done, pending = await asyncio.wait(fs) 

421 

422 Note: This does not raise TimeoutError! Futures that aren't done 

423 when the timeout occurs are returned in the second set. 

424 """ 

425 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 

426 raise TypeError(f"expect a list of futures, not {type(fs).__name__}") 

427 if not fs: 

428 raise ValueError('Set of Tasks/Futures is empty.') 

429 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): 

430 raise ValueError(f'Invalid return_when value: {return_when}') 

431 

432 fs = set(fs) 

433 

434 if any(coroutines.iscoroutine(f) for f in fs): 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true

435 raise TypeError("Passing coroutines is forbidden, use tasks explicitly.") 

436 

437 loop = events.get_running_loop() 

438 return await _wait(fs, timeout, return_when, loop) 

439 

440 

441def _release_waiter(waiter, *args): 

442 if not waiter.done(): 442 ↛ exitline 442 didn't return from function '_release_waiter' because the condition on line 442 was always true

443 waiter.set_result(None) 

444 

445 

446async def wait_for(fut, timeout): 

447 """Wait for the single Future or coroutine to complete, with timeout. 

448 

449 Coroutine will be wrapped in Task. 

450 

451 Returns result of the Future or coroutine. When a timeout occurs, 

452 it cancels the task and raises TimeoutError. To avoid the task 

453 cancellation, wrap it in shield(). 

454 

455 If the wait is cancelled, the task is also cancelled. 

456 

457 If the task suppresses the cancellation and returns a value instead, 

458 that value is returned. 

459 

460 This function is a coroutine. 

461 """ 

462 # The special case for timeout <= 0 is for the following case: 

463 # 

464 # async def test_waitfor(): 

465 # func_started = False 

466 # 

467 # async def func(): 

468 # nonlocal func_started 

469 # func_started = True 

470 # 

471 # try: 

472 # await asyncio.wait_for(func(), 0) 

473 # except asyncio.TimeoutError: 

474 # assert not func_started 

475 # else: 

476 # assert False 

477 # 

478 # asyncio.run(test_waitfor()) 

479 

480 

481 if timeout is not None and timeout <= 0: 

482 fut = ensure_future(fut) 

483 

484 if fut.done(): 

485 return fut.result() 

486 

487 await _cancel_and_wait(fut) 

488 try: 

489 return fut.result() 

490 except exceptions.CancelledError as exc: 

491 raise TimeoutError from exc 

492 

493 async with timeouts.timeout(timeout): 

494 return await fut 

495 

496async def _wait(fs, timeout, return_when, loop): 

497 """Internal helper for wait(). 

498 

499 The fs argument must be a collection of Futures. 

500 """ 

501 assert fs, 'Set of Futures is empty.' 

502 waiter = loop.create_future() 

503 timeout_handle = None 

504 if timeout is not None: 

505 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 

506 counter = len(fs) 

507 cur_task = current_task() 

508 

509 def _on_completion(f): 

510 nonlocal counter 

511 counter -= 1 

512 if (counter <= 0 or 

513 return_when == FIRST_COMPLETED or 

514 return_when == FIRST_EXCEPTION and (not f.cancelled() and 

515 f.exception() is not None)): 

516 if timeout_handle is not None: 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true

517 timeout_handle.cancel() 

518 if not waiter.done(): 

519 waiter.set_result(None) 

520 futures.future_discard_from_awaited_by(f, cur_task) 

521 

522 for f in fs: 

523 f.add_done_callback(_on_completion) 

524 futures.future_add_to_awaited_by(f, cur_task) 

525 

526 try: 

527 await waiter 

528 finally: 

529 if timeout_handle is not None: 

530 timeout_handle.cancel() 

531 for f in fs: 

532 f.remove_done_callback(_on_completion) 

533 

534 done, pending = set(), set() 

535 for f in fs: 

536 if f.done(): 

537 done.add(f) 

538 else: 

539 pending.add(f) 

540 return done, pending 

541 

542 

543async def _cancel_and_wait(fut): 

544 """Cancel the *fut* future or task and wait until it completes.""" 

545 

546 loop = events.get_running_loop() 

547 waiter = loop.create_future() 

548 cb = functools.partial(_release_waiter, waiter) 

549 fut.add_done_callback(cb) 

550 

551 try: 

552 fut.cancel() 

553 # We cannot wait on *fut* directly to make 

554 # sure _cancel_and_wait itself is reliably cancellable. 

555 await waiter 

556 finally: 

557 fut.remove_done_callback(cb) 

558 

559 

560class _AsCompletedIterator: 

561 """Iterator of awaitables representing tasks of asyncio.as_completed. 

562 

563 As an asynchronous iterator, iteration yields futures as they finish. As a 

564 plain iterator, new coroutines are yielded that will return or raise the 

565 result of the next underlying future to complete. 

566 """ 

567 def __init__(self, aws, timeout): 

568 self._done = queues.Queue() 

569 self._timeout_handle = None 

570 

571 loop = events.get_event_loop() 

572 todo = {ensure_future(aw, loop=loop) for aw in set(aws)} 

573 for f in todo: 

574 f.add_done_callback(self._handle_completion) 

575 if todo and timeout is not None: 

576 self._timeout_handle = ( 

577 loop.call_later(timeout, self._handle_timeout) 

578 ) 

579 self._todo = todo 

580 self._todo_left = len(todo) 

581 

582 def __aiter__(self): 

583 return self 

584 

585 def __iter__(self): 

586 return self 

587 

588 async def __anext__(self): 

589 if not self._todo_left: 

590 raise StopAsyncIteration 

591 assert self._todo_left > 0 

592 self._todo_left -= 1 

593 return await self._wait_for_one() 

594 

595 def __next__(self): 

596 if not self._todo_left: 

597 raise StopIteration 

598 assert self._todo_left > 0 

599 self._todo_left -= 1 

600 return self._wait_for_one(resolve=True) 

601 

602 def _handle_timeout(self): 

603 for f in self._todo: 

604 f.remove_done_callback(self._handle_completion) 

605 self._done.put_nowait(None) # Sentinel for _wait_for_one(). 

606 self._todo.clear() # Can't do todo.remove(f) in the loop. 

607 

608 def _handle_completion(self, f): 

609 if not self._todo: 609 ↛ 610line 609 didn't jump to line 610 because the condition on line 609 was never true

610 return # _handle_timeout() was here first. 

611 self._todo.remove(f) 

612 self._done.put_nowait(f) 

613 if not self._todo and self._timeout_handle is not None: 

614 self._timeout_handle.cancel() 

615 

616 async def _wait_for_one(self, resolve=False): 

617 # Wait for the next future to be done and return it unless resolve is 

618 # set, in which case return either the result of the future or raise 

619 # an exception. 

620 f = await self._done.get() 

621 if f is None: 

622 # Dummy value from _handle_timeout(). 

623 raise exceptions.TimeoutError 

624 return f.result() if resolve else f 

625 

626 

627def as_completed(fs, *, timeout=None): 

628 """Create an iterator of awaitables or their results in completion order. 

629 

630 Run the supplied awaitables concurrently. The returned object can be 

631 iterated to obtain the results of the awaitables as they finish. 

632 

633 The object returned can be iterated as an asynchronous iterator or a plain 

634 iterator. When asynchronous iteration is used, the originally-supplied 

635 awaitables are yielded if they are tasks or futures. This makes it easy to 

636 correlate previously-scheduled tasks with their results: 

637 

638 ipv4_connect = create_task(open_connection("127.0.0.1", 80)) 

639 ipv6_connect = create_task(open_connection("::1", 80)) 

640 tasks = [ipv4_connect, ipv6_connect] 

641 

642 async for earliest_connect in as_completed(tasks): 

643 # earliest_connect is done. The result can be obtained by 

644 # awaiting it or calling earliest_connect.result() 

645 reader, writer = await earliest_connect 

646 

647 if earliest_connect is ipv6_connect: 

648 print("IPv6 connection established.") 

649 else: 

650 print("IPv4 connection established.") 

651 

652 During asynchronous iteration, implicitly-created tasks will be yielded for 

653 supplied awaitables that aren't tasks or futures. 

654 

655 When used as a plain iterator, each iteration yields a new coroutine that 

656 returns the result or raises the exception of the next completed awaitable. 

657 This pattern is compatible with Python versions older than 3.13: 

658 

659 ipv4_connect = create_task(open_connection("127.0.0.1", 80)) 

660 ipv6_connect = create_task(open_connection("::1", 80)) 

661 tasks = [ipv4_connect, ipv6_connect] 

662 

663 for next_connect in as_completed(tasks): 

664 # next_connect is not one of the original task objects. It must be 

665 # awaited to obtain the result value or raise the exception of the 

666 # awaitable that finishes next. 

667 reader, writer = await next_connect 

668 

669 A TimeoutError is raised if the timeout occurs before all awaitables are 

670 done. This is raised by the async for loop during asynchronous iteration or 

671 by the coroutines yielded during plain iteration. 

672 """ 

673 if inspect.isawaitable(fs): 

674 raise TypeError( 

675 f"expects an iterable of awaitables, not {type(fs).__name__}" 

676 ) 

677 

678 return _AsCompletedIterator(fs, timeout) 

679 

680 

681@types.coroutine 

682def __sleep0(): 

683 """Skip one event loop run cycle. 

684 

685 This is a private helper for 'asyncio.sleep()', used 

686 when the 'delay' is set to 0. It uses a bare 'yield' 

687 expression (which Task.__step knows how to handle) 

688 instead of creating a Future object. 

689 """ 

690 yield 

691 

692 

693async def sleep(delay, result=None): 

694 """Coroutine that completes after a given time (in seconds).""" 

695 if delay <= 0: 

696 await __sleep0() 

697 return result 

698 

699 if math.isnan(delay): 

700 raise ValueError("Invalid delay: NaN (not a number)") 

701 

702 loop = events.get_running_loop() 

703 future = loop.create_future() 

704 h = loop.call_later(delay, 

705 futures._set_result_unless_cancelled, 

706 future, result) 

707 try: 

708 return await future 

709 finally: 

710 h.cancel() 

711 

712 

713def ensure_future(coro_or_future, *, loop=None): 

714 """Wrap a coroutine or an awaitable in a future. 

715 

716 If the argument is a Future, it is returned directly. 

717 """ 

718 if futures.isfuture(coro_or_future): 

719 if loop is not None and loop is not futures._get_loop(coro_or_future): 

720 raise ValueError('The future belongs to a different loop than ' 

721 'the one specified as the loop argument') 

722 return coro_or_future 

723 should_close = True 

724 if not coroutines.iscoroutine(coro_or_future): 

725 if inspect.isawaitable(coro_or_future): 

726 async def _wrap_awaitable(awaitable): 

727 return await awaitable 

728 

729 coro_or_future = _wrap_awaitable(coro_or_future) 

730 should_close = False 

731 else: 

732 raise TypeError('An asyncio.Future, a coroutine or an awaitable ' 

733 'is required') 

734 

735 if loop is None: 

736 loop = events.get_event_loop() 

737 try: 

738 return loop.create_task(coro_or_future) 

739 except RuntimeError: 

740 if should_close: 740 ↛ 742line 740 didn't jump to line 742 because the condition on line 740 was always true

741 coro_or_future.close() 

742 raise 

743 

744 

745class _GatheringFuture(futures.Future): 

746 """Helper for gather(). 

747 

748 This overrides cancel() to cancel all the children and act more 

749 like Task.cancel(), which doesn't immediately mark itself as 

750 cancelled. 

751 """ 

752 

753 def __init__(self, children, *, loop): 

754 assert loop is not None 

755 super().__init__(loop=loop) 

756 self._children = children 

757 self._cancel_requested = False 

758 

759 def cancel(self, msg=None): 

760 if self.done(): 

761 return False 

762 ret = False 

763 for child in self._children: 

764 if child.cancel(msg=msg): 

765 ret = True 

766 if ret: 

767 # If any child tasks were actually cancelled, we should 

768 # propagate the cancellation request regardless of 

769 # *return_exceptions* argument. See issue 32684. 

770 self._cancel_requested = True 

771 return ret 

772 

773 

774def gather(*coros_or_futures, return_exceptions=False): 

775 """Return a future aggregating results from the given coroutines/futures. 

776 

777 Coroutines will be wrapped in a future and scheduled in the event 

778 loop. They will not necessarily be scheduled in the same order as 

779 passed in. 

780 

781 All futures must share the same event loop. If all the tasks are 

782 done successfully, the returned future's result is the list of 

783 results (in the order of the original sequence, not necessarily 

784 the order of results arrival). If *return_exceptions* is True, 

785 exceptions in the tasks are treated the same as successful 

786 results, and gathered in the result list; otherwise, the first 

787 raised exception will be immediately propagated to the returned 

788 future. 

789 

790 Cancellation: if the outer Future is cancelled, all children (that 

791 have not completed yet) are also cancelled. If any child is 

792 cancelled, this is treated as if it raised CancelledError -- 

793 the outer Future is *not* cancelled in this case. (This is to 

794 prevent the cancellation of one child to cause other children to 

795 be cancelled.) 

796 

797 If *return_exceptions* is False, cancelling gather() after it 

798 has been marked done won't cancel any submitted awaitables. 

799 For instance, gather can be marked done after propagating an 

800 exception to the caller, therefore, calling ``gather.cancel()`` 

801 after catching an exception (raised by one of the awaitables) from 

802 gather won't cancel any other awaitables. 

803 """ 

804 if not coros_or_futures: 

805 loop = events.get_event_loop() 

806 outer = loop.create_future() 

807 outer.set_result([]) 

808 return outer 

809 

810 loop = events._get_running_loop() 

811 if loop is not None: 

812 cur_task = current_task(loop) 

813 else: 

814 cur_task = None 

815 

816 def _done_callback(fut, cur_task=cur_task): 

817 nonlocal nfinished 

818 nfinished += 1 

819 

820 if cur_task is not None: 

821 futures.future_discard_from_awaited_by(fut, cur_task) 

822 

823 if outer is None or outer.done(): 

824 if not fut.cancelled(): 

825 # Mark exception retrieved. 

826 fut.exception() 

827 return 

828 

829 if not return_exceptions: 

830 if fut.cancelled(): 

831 # Check if 'fut' is cancelled first, as 

832 # 'fut.exception()' will *raise* a CancelledError 

833 # instead of returning it. 

834 exc = fut._make_cancelled_error() 

835 outer.set_exception(exc) 

836 return 

837 else: 

838 exc = fut.exception() 

839 if exc is not None: 

840 outer.set_exception(exc) 

841 return 

842 

843 if nfinished == nfuts: 

844 # All futures are done; create a list of results 

845 # and set it to the 'outer' future. 

846 results = [] 

847 

848 for fut in children: 

849 if fut.cancelled(): 

850 # Check if 'fut' is cancelled first, as 'fut.exception()' 

851 # will *raise* a CancelledError instead of returning it. 

852 # Also, since we're adding the exception return value 

853 # to 'results' instead of raising it, don't bother 

854 # setting __context__. This also lets us preserve 

855 # calling '_make_cancelled_error()' at most once. 

856 res = exceptions.CancelledError( 

857 '' if fut._cancel_message is None else 

858 fut._cancel_message) 

859 else: 

860 res = fut.exception() 

861 if res is None: 

862 res = fut.result() 

863 results.append(res) 

864 

865 if outer._cancel_requested: 

866 # If gather is being cancelled we must propagate the 

867 # cancellation regardless of *return_exceptions* argument. 

868 # See issue 32684. 

869 exc = fut._make_cancelled_error() 

870 outer.set_exception(exc) 

871 else: 

872 outer.set_result(results) 

873 

874 arg_to_fut = {} 

875 children = [] 

876 nfuts = 0 

877 nfinished = 0 

878 done_futs = [] 

879 outer = None # bpo-46672 

880 for arg in coros_or_futures: 

881 if arg not in arg_to_fut: 

882 fut = ensure_future(arg, loop=loop) 

883 if loop is None: 

884 loop = futures._get_loop(fut) 

885 if fut is not arg: 

886 # 'arg' was not a Future, therefore, 'fut' is a new 

887 # Future created specifically for 'arg'. Since the caller 

888 # can't control it, disable the "destroy pending task" 

889 # warning. 

890 fut._log_destroy_pending = False 

891 nfuts += 1 

892 arg_to_fut[arg] = fut 

893 if fut.done(): 

894 done_futs.append(fut) 

895 else: 

896 if cur_task is not None: 

897 futures.future_add_to_awaited_by(fut, cur_task) 

898 fut.add_done_callback(_done_callback) 

899 

900 else: 

901 # There's a duplicate Future object in coros_or_futures. 

902 fut = arg_to_fut[arg] 

903 

904 children.append(fut) 

905 

906 outer = _GatheringFuture(children, loop=loop) 

907 # Run done callbacks after GatheringFuture created so any post-processing 

908 # can be performed at this point 

909 # optimization: in the special case that *all* futures finished eagerly, 

910 # this will effectively complete the gather eagerly, with the last 

911 # callback setting the result (or exception) on outer before returning it 

912 for fut in done_futs: 

913 _done_callback(fut) 

914 return outer 

915 

916 

917def shield(arg): 

918 """Wait for a future, shielding it from cancellation. 

919 

920 The statement 

921 

922 task = asyncio.create_task(something()) 

923 res = await shield(task) 

924 

925 is exactly equivalent to the statement 

926 

927 res = await something() 

928 

929 *except* that if the coroutine containing it is cancelled, the 

930 task running in something() is not cancelled. From the POV of 

931 something(), the cancellation did not happen. But its caller is 

932 still cancelled, so the yield-from expression still raises 

933 CancelledError. Note: If something() is cancelled by other means 

934 this will still cancel shield(). 

935 

936 If you want to completely ignore cancellation (not recommended) 

937 you can combine shield() with a try/except clause, as follows: 

938 

939 task = asyncio.create_task(something()) 

940 try: 

941 res = await shield(task) 

942 except CancelledError: 

943 res = None 

944 

945 Save a reference to tasks passed to this function, to avoid 

946 a task disappearing mid-execution. The event loop only keeps 

947 weak references to tasks. A task that isn't referenced elsewhere 

948 may get garbage collected at any time, even before it's done. 

949 """ 

950 inner = ensure_future(arg) 

951 if inner.done(): 

952 # Shortcut. 

953 return inner 

954 loop = futures._get_loop(inner) 

955 outer = loop.create_future() 

956 

957 if loop is not None and (cur_task := current_task(loop)) is not None: 

958 futures.future_add_to_awaited_by(inner, cur_task) 

959 else: 

960 cur_task = None 

961 

962 def _inner_done_callback(inner, cur_task=cur_task): 

963 if cur_task is not None: 

964 futures.future_discard_from_awaited_by(inner, cur_task) 

965 

966 if outer.cancelled(): 966 ↛ 967line 966 didn't jump to line 967 because the condition on line 966 was never true

967 if not inner.cancelled(): 

968 # Mark inner's result as retrieved. 

969 inner.exception() 

970 return 

971 

972 if inner.cancelled(): 

973 outer.cancel() 

974 else: 

975 exc = inner.exception() 

976 if exc is not None: 

977 outer.set_exception(exc) 

978 else: 

979 outer.set_result(inner.result()) 

980 

981 

982 def _outer_done_callback(outer): 

983 if not inner.done(): 

984 inner.remove_done_callback(_inner_done_callback) 

985 

986 inner.add_done_callback(_inner_done_callback) 

987 outer.add_done_callback(_outer_done_callback) 

988 return outer 

989 

990 

991def run_coroutine_threadsafe(coro, loop): 

992 """Submit a coroutine object to a given event loop. 

993 

994 Return a concurrent.futures.Future to access the result. 

995 """ 

996 if not coroutines.iscoroutine(coro): 996 ↛ 997line 996 didn't jump to line 997 because the condition on line 996 was never true

997 raise TypeError('A coroutine object is required') 

998 future = concurrent.futures.Future() 

999 

1000 def callback(): 

1001 try: 

1002 futures._chain_future(ensure_future(coro, loop=loop), future) 

1003 except (SystemExit, KeyboardInterrupt): 

1004 raise 

1005 except BaseException as exc: 

1006 if future.set_running_or_notify_cancel(): 1006 ↛ 1008line 1006 didn't jump to line 1008 because the condition on line 1006 was always true

1007 future.set_exception(exc) 

1008 raise 

1009 

1010 loop.call_soon_threadsafe(callback) 

1011 return future 

1012 

1013 

1014def create_eager_task_factory(custom_task_constructor): 

1015 """Create a function suitable for use as a task factory on an event-loop. 

1016 

1017 Example usage: 

1018 

1019 loop.set_task_factory( 

1020 asyncio.create_eager_task_factory(my_task_constructor)) 

1021 

1022 Now, tasks created will be started immediately (rather than being first 

1023 scheduled to an event loop). The constructor argument can be any callable 

1024 that returns a Task-compatible object and has a signature compatible 

1025 with `Task.__init__`; it must have the `eager_start` keyword argument. 

1026 

1027 Most applications will use `Task` for `custom_task_constructor` and in 

1028 this case there's no need to call `create_eager_task_factory()` 

1029 directly. Instead the global `eager_task_factory` instance can be 

1030 used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`. 

1031 """ 

1032 

1033 def factory(loop, coro, *, name=None, context=None): 

1034 return custom_task_constructor( 

1035 coro, loop=loop, name=name, context=context, eager_start=True) 

1036 

1037 return factory 

1038 

1039 

1040eager_task_factory = create_eager_task_factory(Task) 

1041 

1042 

1043# Collectively these two sets hold references to the complete set of active 

1044# tasks. Eagerly executed tasks use a faster regular set as an optimization 

1045# but may graduate to a WeakSet if the task blocks on IO. 

1046_scheduled_tasks = weakref.WeakSet() 

1047_eager_tasks = set() 

1048 

1049# Dictionary containing tasks that are currently active in 

1050# all running event loops. {EventLoop: Task} 

1051_current_tasks = {} 

1052 

1053 

1054def _register_task(task): 

1055 """Register an asyncio Task scheduled to run on an event loop.""" 

1056 _scheduled_tasks.add(task) 

1057 

1058 

1059def _register_eager_task(task): 

1060 """Register an asyncio Task about to be eagerly executed.""" 

1061 _eager_tasks.add(task) 

1062 

1063 

1064def _enter_task(loop, task): 

1065 current_task = _current_tasks.get(loop) 

1066 if current_task is not None: 

1067 raise RuntimeError(f"Cannot enter into task {task!r} while another " 

1068 f"task {current_task!r} is being executed.") 

1069 _current_tasks[loop] = task 

1070 

1071 

1072def _leave_task(loop, task): 

1073 current_task = _current_tasks.get(loop) 

1074 if current_task is not task: 

1075 raise RuntimeError(f"Leaving task {task!r} does not match " 

1076 f"the current task {current_task!r}.") 

1077 del _current_tasks[loop] 

1078 

1079 

1080def _swap_current_task(loop, task): 

1081 prev_task = _current_tasks.get(loop) 

1082 if task is None: 

1083 del _current_tasks[loop] 

1084 else: 

1085 _current_tasks[loop] = task 

1086 return prev_task 

1087 

1088 

1089def _unregister_task(task): 

1090 """Unregister a completed, scheduled Task.""" 

1091 _scheduled_tasks.discard(task) 

1092 

1093 

1094def _unregister_eager_task(task): 

1095 """Unregister a task which finished its first eager step.""" 

1096 _eager_tasks.discard(task) 

1097 

1098 

1099_py_current_task = current_task 

1100_py_register_task = _register_task 

1101_py_register_eager_task = _register_eager_task 

1102_py_unregister_task = _unregister_task 

1103_py_unregister_eager_task = _unregister_eager_task 

1104_py_enter_task = _enter_task 

1105_py_leave_task = _leave_task 

1106_py_swap_current_task = _swap_current_task 

1107_py_all_tasks = all_tasks 

1108 

1109try: 

1110 from _asyncio import (_register_task, _register_eager_task, 

1111 _unregister_task, _unregister_eager_task, 

1112 _enter_task, _leave_task, _swap_current_task, 

1113 current_task, all_tasks) 

1114except ImportError: 

1115 pass 

1116else: 

1117 _c_current_task = current_task 

1118 _c_register_task = _register_task 

1119 _c_register_eager_task = _register_eager_task 

1120 _c_unregister_task = _unregister_task 

1121 _c_unregister_eager_task = _unregister_eager_task 

1122 _c_enter_task = _enter_task 

1123 _c_leave_task = _leave_task 

1124 _c_swap_current_task = _swap_current_task 

1125 _c_all_tasks = all_tasks