Coverage for Lib/asyncio/tasks.py: 95%
530 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:28 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:28 +0000
1"""Support for tasks, coroutines and the scheduler."""
3__all__ = (
4 'Task', 'create_task',
5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep',
7 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
8 'current_task', 'all_tasks',
9 'create_eager_task_factory', 'eager_task_factory',
10 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
11)
13import concurrent.futures
14import contextvars
15import functools
16import inspect
17import itertools
18import math
19import types
20import weakref
21from types import GenericAlias
23from . import base_tasks
24from . import coroutines
25from . import events
26from . import exceptions
27from . import futures
28from . import queues
29from . import timeouts
31# Helper to generate new task names
32# This uses itertools.count() instead of a "+= 1" operation because the latter
33# is not thread safe. See bpo-11866 for a longer explanation.
34_task_name_counter = itertools.count(1).__next__
37def current_task(loop=None):
38 """Return a currently executed task."""
39 if loop is None:
40 loop = events.get_running_loop()
41 return _current_tasks.get(loop)
44def all_tasks(loop=None):
45 """Return a set of all tasks for the loop."""
46 if loop is None:
47 loop = events.get_running_loop()
48 # capturing the set of eager tasks first, so if an eager task "graduates"
49 # to a regular task in another thread, we don't risk missing it.
50 eager_tasks = list(_eager_tasks)
52 return {t for t in itertools.chain(_scheduled_tasks, eager_tasks)
53 if futures._get_loop(t) is loop and not t.done()}
56class Task(futures._PyFuture): # Inherit Python Task implementation
57 # from a Python Future implementation.
59 """A coroutine wrapped in a Future."""
61 # An important invariant maintained while a Task not done:
62 # _fut_waiter is either None or a Future. The Future
63 # can be either done() or not done().
64 # The task can be in any of 3 states:
65 #
66 # - 1: _fut_waiter is not None and not _fut_waiter.done():
67 # __step() is *not* scheduled and the Task is waiting for _fut_waiter.
68 # - 2: (_fut_waiter is None or _fut_waiter.done()) and __step() is scheduled:
69 # the Task is waiting for __step() to be executed.
70 # - 3: _fut_waiter is None and __step() is *not* scheduled:
71 # the Task is currently executing (in __step()).
72 #
73 # * In state 1, one of the callbacks of __fut_waiter must be __wakeup().
74 # * The transition from 1 to 2 happens when _fut_waiter becomes done(),
75 # as it schedules __wakeup() to be called (which calls __step() so
76 # we way that __step() is scheduled).
77 # * It transitions from 2 to 3 when __step() is executed, and it clears
78 # _fut_waiter to None.
80 # If False, don't log a message if the task is destroyed while its
81 # status is still pending
82 _log_destroy_pending = True
84 def __init__(self, coro, *, loop=None, name=None, context=None,
85 eager_start=False):
86 super().__init__(loop=loop)
87 if self._source_traceback:
88 del self._source_traceback[-1]
89 if not coroutines.iscoroutine(coro):
90 # raise after Future.__init__(), attrs are required for __del__
91 # prevent logging for pending task in __del__
92 self._log_destroy_pending = False
93 raise TypeError(f"a coroutine was expected, got {coro!r}")
95 if name is None:
96 self._name = f'Task-{_task_name_counter()}'
97 else:
98 self._name = str(name)
100 self._num_cancels_requested = 0
101 self._must_cancel = False
102 self._fut_waiter = None
103 self._coro = coro
104 if context is None:
105 self._context = contextvars.copy_context()
106 else:
107 self._context = context
109 if eager_start and self._loop.is_running():
110 self.__eager_start()
111 else:
112 self._loop.call_soon(self.__step, context=self._context)
113 _py_register_task(self)
115 def __del__(self):
116 if self._state == futures._PENDING and self._log_destroy_pending:
117 context = {
118 'task': self,
119 'message': 'Task was destroyed but it is pending!',
120 }
121 if self._source_traceback:
122 context['source_traceback'] = self._source_traceback
123 self._loop.call_exception_handler(context)
124 super().__del__()
126 __class_getitem__ = classmethod(GenericAlias)
128 def __repr__(self):
129 return base_tasks._task_repr(self)
131 def get_coro(self):
132 return self._coro
134 def get_context(self):
135 return self._context
137 def get_name(self):
138 return self._name
140 def set_name(self, value):
141 self._name = str(value)
143 def set_result(self, result):
144 raise RuntimeError('Task does not support set_result operation')
146 def set_exception(self, exception):
147 raise RuntimeError('Task does not support set_exception operation')
149 def get_stack(self, *, limit=None):
150 """Return the list of stack frames for this task's coroutine.
152 If the coroutine is not done, this returns the stack where it is
153 suspended. If the coroutine has completed successfully or was
154 cancelled, this returns an empty list. If the coroutine was
155 terminated by an exception, this returns the list of traceback
156 frames.
158 The frames are always ordered from oldest to newest.
160 The optional limit gives the maximum number of frames to
161 return; by default all available frames are returned. Its
162 meaning differs depending on whether a stack or a traceback is
163 returned: the newest frames of a stack are returned, but the
164 oldest frames of a traceback are returned. (This matches the
165 behavior of the traceback module.)
167 For reasons beyond our control, only one stack frame is
168 returned for a suspended coroutine.
169 """
170 return base_tasks._task_get_stack(self, limit)
172 def print_stack(self, *, limit=None, file=None):
173 """Print the stack or traceback for this task's coroutine.
175 This produces output similar to that of the traceback module,
176 for the frames retrieved by get_stack(). The limit argument
177 is passed to get_stack(). The file argument is an I/O stream
178 to which the output is written; by default output is written
179 to sys.stderr.
180 """
181 return base_tasks._task_print_stack(self, limit, file)
183 def cancel(self, msg=None):
184 """Request that this task cancel itself.
186 This arranges for a CancelledError to be thrown into the
187 wrapped coroutine on the next cycle through the event loop.
188 The coroutine then has a chance to clean up or even deny
189 the request using try/except/finally.
191 Unlike Future.cancel, this does not guarantee that the
192 task will be cancelled: the exception might be caught and
193 acted upon, delaying cancellation of the task or preventing
194 cancellation completely. The task may also return a value or
195 raise a different exception.
197 Immediately after this method is called, Task.cancelled() will
198 not return True (unless the task was already cancelled). A
199 task will be marked as cancelled when the wrapped coroutine
200 terminates with a CancelledError exception (even if cancel()
201 was not called).
203 This also increases the task's count of cancellation requests.
204 """
205 self._log_traceback = False
206 if self.done():
207 return False
208 self._num_cancels_requested += 1
209 # These two lines are controversial. See discussion starting at
210 # https://github.com/python/cpython/pull/31394#issuecomment-1053545331
211 # Also remember that this is duplicated in _asynciomodule.c.
212 # if self._num_cancels_requested > 1:
213 # return False
214 if self._fut_waiter is not None:
215 if self._fut_waiter.cancel(msg=msg):
216 # Leave self._fut_waiter; it may be a Task that
217 # catches and ignores the cancellation so we may have
218 # to cancel it again later.
219 return True
220 # It must be the case that self.__step is already scheduled.
221 self._must_cancel = True
222 self._cancel_message = msg
223 return True
225 def cancelling(self):
226 """Return the count of the task's cancellation requests.
228 This count is incremented when .cancel() is called
229 and may be decremented using .uncancel().
230 """
231 return self._num_cancels_requested
233 def uncancel(self):
234 """Decrement the task's count of cancellation requests.
236 This should be called by the party that called `cancel()` on the task
237 beforehand.
239 Returns the remaining number of cancellation requests.
240 """
241 if self._num_cancels_requested > 0: 241 ↛ 245line 241 didn't jump to line 245 because the condition on line 241 was always true
242 self._num_cancels_requested -= 1
243 if self._num_cancels_requested == 0: 243 ↛ 245line 243 didn't jump to line 245 because the condition on line 243 was always true
244 self._must_cancel = False
245 return self._num_cancels_requested
247 def __eager_start(self):
248 prev_task = _py_swap_current_task(self._loop, self)
249 try:
250 _py_register_eager_task(self)
251 try:
252 self._context.run(self.__step_run_and_handle_result, None)
253 finally:
254 _py_unregister_eager_task(self)
255 finally:
256 try:
257 curtask = _py_swap_current_task(self._loop, prev_task)
258 assert curtask is self
259 finally:
260 if self.done():
261 self._coro = None
262 self = None # Needed to break cycles when an exception occurs.
263 else:
264 _py_register_task(self)
266 def __step(self, exc=None):
267 if self.done():
268 raise exceptions.InvalidStateError(
269 f'__step(): already done: {self!r}, {exc!r}')
270 if self._must_cancel:
271 if not isinstance(exc, exceptions.CancelledError):
272 exc = self._make_cancelled_error()
273 self._must_cancel = False
274 self._fut_waiter = None
276 _py_enter_task(self._loop, self)
277 try:
278 self.__step_run_and_handle_result(exc)
279 finally:
280 _py_leave_task(self._loop, self)
281 self = None # Needed to break cycles when an exception occurs.
283 def __step_run_and_handle_result(self, exc):
284 coro = self._coro
285 try:
286 if exc is None:
287 # We use the `send` method directly, because coroutines
288 # don't have `__iter__` and `__next__` methods.
289 result = coro.send(None)
290 else:
291 result = coro.throw(exc)
292 except StopIteration as exc:
293 if self._must_cancel:
294 # Task is cancelled right before coro stops.
295 self._must_cancel = False
296 super().cancel(msg=self._cancel_message)
297 else:
298 super().set_result(exc.value)
299 except exceptions.CancelledError as exc:
300 # Save the original exception so we can chain it later.
301 self._cancelled_exc = exc
302 super().cancel() # I.e., Future.cancel(self).
303 except (KeyboardInterrupt, SystemExit) as exc:
304 super().set_exception(exc)
305 raise
306 except BaseException as exc:
307 super().set_exception(exc)
308 else:
309 blocking = getattr(result, '_asyncio_future_blocking', None)
310 if blocking is not None:
311 # Yielded Future must come from Future.__iter__().
312 if futures._get_loop(result) is not self._loop:
313 new_exc = RuntimeError(
314 f'Task {self!r} got Future '
315 f'{result!r} attached to a different loop')
316 self._loop.call_soon(
317 self.__step, new_exc, context=self._context)
318 elif blocking: 318 ↛ 335line 318 didn't jump to line 335 because the condition on line 318 was always true
319 if result is self:
320 new_exc = RuntimeError(
321 f'Task cannot await on itself: {self!r}')
322 self._loop.call_soon(
323 self.__step, new_exc, context=self._context)
324 else:
325 futures.future_add_to_awaited_by(result, self)
326 result._asyncio_future_blocking = False
327 result.add_done_callback(
328 self.__wakeup, context=self._context)
329 self._fut_waiter = result
330 if self._must_cancel:
331 if self._fut_waiter.cancel( 331 ↛ 357line 331 didn't jump to line 357 because the condition on line 331 was always true
332 msg=self._cancel_message):
333 self._must_cancel = False
334 else:
335 new_exc = RuntimeError(
336 f'yield was used instead of yield from '
337 f'in task {self!r} with {result!r}')
338 self._loop.call_soon(
339 self.__step, new_exc, context=self._context)
341 elif result is None: 341 ↛ 344line 341 didn't jump to line 344 because the condition on line 341 was always true
342 # Bare yield relinquishes control for one event loop iteration.
343 self._loop.call_soon(self.__step, context=self._context)
344 elif inspect.isgenerator(result):
345 # Yielding a generator is just wrong.
346 new_exc = RuntimeError(
347 f'yield was used instead of yield from for '
348 f'generator in task {self!r} with {result!r}')
349 self._loop.call_soon(
350 self.__step, new_exc, context=self._context)
351 else:
352 # Yielding something else is an error.
353 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
354 self._loop.call_soon(
355 self.__step, new_exc, context=self._context)
356 finally:
357 self = None # Needed to break cycles when an exception occurs.
359 def __wakeup(self, future):
360 futures.future_discard_from_awaited_by(future, self)
361 try:
362 future.result()
363 except BaseException as exc:
364 # This may also be a cancellation.
365 self.__step(exc)
366 else:
367 # Don't pass the value of `future.result()` explicitly,
368 # as `Future.__iter__` and `Future.__await__` don't need it.
369 # If we call `__step(value, None)` instead of `__step()`,
370 # Python eval loop would use `.send(value)` method call,
371 # instead of `__next__()`, which is slower for futures
372 # that return non-generator iterators from their `__iter__`.
373 self.__step()
374 self = None # Needed to break cycles when an exception occurs.
377_PyTask = Task
380try:
381 import _asyncio
382except ImportError:
383 pass
384else:
385 # _CTask is needed for tests.
386 Task = _CTask = _asyncio.Task
389def create_task(coro, **kwargs):
390 """Schedule the execution of a coroutine object in a spawn task.
392 Return a Task object.
393 """
394 loop = events.get_running_loop()
395 return loop.create_task(coro, **kwargs)
398# wait() and as_completed() similar to those in PEP 3148.
400FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
401FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
402ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
405async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
406 """Wait for the Futures or Tasks given by fs to complete.
408 The fs iterable must not be empty.
410 Returns two sets of Future: (done, pending).
412 Usage:
414 done, pending = await asyncio.wait(fs)
416 Note: This does not raise TimeoutError! Futures that aren't done
417 when the timeout occurs are returned in the second set.
418 """
419 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
420 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
421 if not fs:
422 raise ValueError('Set of Tasks/Futures is empty.')
423 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
424 raise ValueError(f'Invalid return_when value: {return_when}')
426 fs = set(fs)
428 if any(coroutines.iscoroutine(f) for f in fs): 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true
429 raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
431 loop = events.get_running_loop()
432 return await _wait(fs, timeout, return_when, loop)
435def _release_waiter(waiter, *args):
436 if not waiter.done(): 436 ↛ exitline 436 didn't return from function '_release_waiter' because the condition on line 436 was always true
437 waiter.set_result(None)
440async def wait_for(fut, timeout):
441 """Wait for the single Future or coroutine to complete, with timeout.
443 Coroutine will be wrapped in Task.
445 Returns result of the Future or coroutine. When a timeout occurs,
446 it cancels the task and raises TimeoutError. To avoid the task
447 cancellation, wrap it in shield().
449 If the wait is cancelled, the task is also cancelled.
451 If the task suppresses the cancellation and returns a value instead,
452 that value is returned.
454 This function is a coroutine.
455 """
456 # The special case for timeout <= 0 is for the following case:
457 #
458 # async def test_waitfor():
459 # func_started = False
460 #
461 # async def func():
462 # nonlocal func_started
463 # func_started = True
464 #
465 # try:
466 # await asyncio.wait_for(func(), 0)
467 # except asyncio.TimeoutError:
468 # assert not func_started
469 # else:
470 # assert False
471 #
472 # asyncio.run(test_waitfor())
475 if timeout is not None and timeout <= 0:
476 fut = ensure_future(fut)
478 if fut.done():
479 return fut.result()
481 await _cancel_and_wait(fut)
482 try:
483 return fut.result()
484 except exceptions.CancelledError as exc:
485 raise TimeoutError from exc
487 async with timeouts.timeout(timeout):
488 return await fut
490async def _wait(fs, timeout, return_when, loop):
491 """Internal helper for wait().
493 The fs argument must be a collection of Futures.
494 """
495 assert fs, 'Set of Futures is empty.'
496 waiter = loop.create_future()
497 timeout_handle = None
498 if timeout is not None:
499 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
500 counter = len(fs)
501 cur_task = current_task()
503 def _on_completion(f):
504 nonlocal counter
505 counter -= 1
506 if (counter <= 0 or
507 return_when == FIRST_COMPLETED or
508 return_when == FIRST_EXCEPTION and (not f.cancelled() and
509 f.exception() is not None)):
510 if timeout_handle is not None: 510 ↛ 511line 510 didn't jump to line 511 because the condition on line 510 was never true
511 timeout_handle.cancel()
512 if not waiter.done():
513 waiter.set_result(None)
514 futures.future_discard_from_awaited_by(f, cur_task)
516 for f in fs:
517 f.add_done_callback(_on_completion)
518 futures.future_add_to_awaited_by(f, cur_task)
520 try:
521 await waiter
522 finally:
523 if timeout_handle is not None:
524 timeout_handle.cancel()
525 for f in fs:
526 f.remove_done_callback(_on_completion)
528 done, pending = set(), set()
529 for f in fs:
530 if f.done():
531 done.add(f)
532 else:
533 pending.add(f)
534 return done, pending
537async def _cancel_and_wait(fut):
538 """Cancel the *fut* future or task and wait until it completes."""
540 loop = events.get_running_loop()
541 waiter = loop.create_future()
542 cb = functools.partial(_release_waiter, waiter)
543 fut.add_done_callback(cb)
545 try:
546 fut.cancel()
547 # We cannot wait on *fut* directly to make
548 # sure _cancel_and_wait itself is reliably cancellable.
549 await waiter
550 finally:
551 fut.remove_done_callback(cb)
554class _AsCompletedIterator:
555 """Iterator of awaitables representing tasks of asyncio.as_completed.
557 As an asynchronous iterator, iteration yields futures as they finish. As a
558 plain iterator, new coroutines are yielded that will return or raise the
559 result of the next underlying future to complete.
560 """
561 def __init__(self, aws, timeout):
562 self._done = queues.Queue()
563 self._timeout_handle = None
565 loop = events.get_event_loop()
566 todo = {ensure_future(aw, loop=loop) for aw in set(aws)}
567 for f in todo:
568 f.add_done_callback(self._handle_completion)
569 if todo and timeout is not None:
570 self._timeout_handle = (
571 loop.call_later(timeout, self._handle_timeout)
572 )
573 self._todo = todo
574 self._todo_left = len(todo)
576 def __aiter__(self):
577 return self
579 def __iter__(self):
580 return self
582 async def __anext__(self):
583 if not self._todo_left:
584 raise StopAsyncIteration
585 assert self._todo_left > 0
586 self._todo_left -= 1
587 return await self._wait_for_one()
589 def __next__(self):
590 if not self._todo_left:
591 raise StopIteration
592 assert self._todo_left > 0
593 self._todo_left -= 1
594 return self._wait_for_one(resolve=True)
596 def _handle_timeout(self):
597 for f in self._todo:
598 f.remove_done_callback(self._handle_completion)
599 self._done.put_nowait(None) # Sentinel for _wait_for_one().
600 self._todo.clear() # Can't do todo.remove(f) in the loop.
602 def _handle_completion(self, f):
603 if not self._todo: 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true
604 return # _handle_timeout() was here first.
605 self._todo.remove(f)
606 self._done.put_nowait(f)
607 if not self._todo and self._timeout_handle is not None:
608 self._timeout_handle.cancel()
610 async def _wait_for_one(self, resolve=False):
611 # Wait for the next future to be done and return it unless resolve is
612 # set, in which case return either the result of the future or raise
613 # an exception.
614 f = await self._done.get()
615 if f is None:
616 # Dummy value from _handle_timeout().
617 raise exceptions.TimeoutError
618 return f.result() if resolve else f
621def as_completed(fs, *, timeout=None):
622 """Create an iterator of awaitables or their results in completion order.
624 Run the supplied awaitables concurrently. The returned object can be
625 iterated to obtain the results of the awaitables as they finish.
627 The object returned can be iterated as an asynchronous iterator or
628 a plain iterator. When asynchronous iteration is used, the
629 originally-supplied awaitables are yielded if they are tasks or
630 futures. This makes it easy to correlate previously-scheduled tasks
631 with their results:
633 ipv4_connect = create_task(open_connection("127.0.0.1", 80))
634 ipv6_connect = create_task(open_connection("::1", 80))
635 tasks = [ipv4_connect, ipv6_connect]
637 async for earliest_connect in as_completed(tasks):
638 # earliest_connect is done. The result can be obtained by
639 # awaiting it or calling earliest_connect.result()
640 reader, writer = await earliest_connect
642 if earliest_connect is ipv6_connect:
643 print("IPv6 connection established.")
644 else:
645 print("IPv4 connection established.")
647 During asynchronous iteration, implicitly-created tasks will be
648 yielded for supplied awaitables that aren't tasks or futures.
650 When used as a plain iterator, each iteration yields a new coroutine
651 that returns the result or raises the exception of the next completed
652 awaitable. This pattern is compatible with Python versions older than
653 3.13:
655 ipv4_connect = create_task(open_connection("127.0.0.1", 80))
656 ipv6_connect = create_task(open_connection("::1", 80))
657 tasks = [ipv4_connect, ipv6_connect]
659 for next_connect in as_completed(tasks):
660 # next_connect is not one of the original task objects. It must
661 # be awaited to obtain the result value or raise the exception
662 # of the awaitable that finishes next.
663 reader, writer = await next_connect
665 A TimeoutError is raised if the timeout occurs before all awaitables
666 are done. This is raised by the async for loop during asynchronous
667 iteration or by the coroutines yielded during plain iteration.
668 """
669 if inspect.isawaitable(fs):
670 raise TypeError(
671 f"expects an iterable of awaitables, not {type(fs).__name__}"
672 )
674 return _AsCompletedIterator(fs, timeout)
677@types.coroutine
678def __sleep0():
679 """Skip one event loop run cycle.
681 This is a private helper for 'asyncio.sleep()', used
682 when the 'delay' is set to 0. It uses a bare 'yield'
683 expression (which Task.__step knows how to handle)
684 instead of creating a Future object.
685 """
686 yield
689async def sleep(delay, result=None):
690 """Coroutine that completes after a given time (in seconds)."""
691 if delay <= 0:
692 await __sleep0()
693 return result
695 if math.isnan(delay):
696 raise ValueError("Invalid delay: NaN (not a number)")
698 loop = events.get_running_loop()
699 future = loop.create_future()
700 h = loop.call_later(delay,
701 futures._set_result_unless_cancelled,
702 future, result)
703 try:
704 return await future
705 finally:
706 h.cancel()
709def ensure_future(coro_or_future, *, loop=None):
710 """Wrap a coroutine or an awaitable in a future.
712 If the argument is a Future, it is returned directly.
713 """
714 if futures.isfuture(coro_or_future):
715 if loop is not None and loop is not futures._get_loop(coro_or_future):
716 raise ValueError('The future belongs to a different loop than '
717 'the one specified as the loop argument')
718 return coro_or_future
719 should_close = True
720 if not coroutines.iscoroutine(coro_or_future):
721 if inspect.isawaitable(coro_or_future):
722 async def _wrap_awaitable(awaitable):
723 return await awaitable
725 coro_or_future = _wrap_awaitable(coro_or_future)
726 should_close = False
727 else:
728 raise TypeError('An asyncio.Future, a coroutine or an awaitable '
729 'is required')
731 if loop is None:
732 loop = events.get_event_loop()
733 try:
734 return loop.create_task(coro_or_future)
735 except RuntimeError:
736 if should_close: 736 ↛ 738line 736 didn't jump to line 738 because the condition on line 736 was always true
737 coro_or_future.close()
738 raise
741class _GatheringFuture(futures.Future):
742 """Helper for gather().
744 This overrides cancel() to cancel all the children and act more
745 like Task.cancel(), which doesn't immediately mark itself as
746 cancelled.
747 """
749 def __init__(self, children, *, loop):
750 assert loop is not None
751 super().__init__(loop=loop)
752 self._children = children
753 self._cancel_requested = False
755 def cancel(self, msg=None):
756 if self.done():
757 return False
758 ret = False
759 for child in self._children:
760 if child.cancel(msg=msg):
761 ret = True
762 if ret:
763 # If any child tasks were actually cancelled, we should
764 # propagate the cancellation request regardless of
765 # *return_exceptions* argument. See issue 32684.
766 self._cancel_requested = True
767 return ret
770def gather(*coros_or_futures, return_exceptions=False):
771 """Return a future aggregating results from the given coroutines/futures.
773 Coroutines will be wrapped in a future and scheduled in the event
774 loop. They will not necessarily be scheduled in the same order as
775 passed in.
777 All futures must share the same event loop. If all the tasks are
778 done successfully, the returned future's result is the list of
779 results (in the order of the original sequence, not necessarily
780 the order of results arrival). If *return_exceptions* is True,
781 exceptions in the tasks are treated the same as successful
782 results, and gathered in the result list; otherwise, the first
783 raised exception will be immediately propagated to the returned
784 future.
786 Cancellation: if the outer Future is cancelled, all children (that
787 have not completed yet) are also cancelled. If any child is
788 cancelled, this is treated as if it raised CancelledError --
789 the outer Future is *not* cancelled in this case. (This is to
790 prevent the cancellation of one child to cause other children to
791 be cancelled.)
793 If *return_exceptions* is False, cancelling gather() after it
794 has been marked done won't cancel any submitted awaitables.
795 For instance, gather can be marked done after propagating an
796 exception to the caller, therefore, calling ``gather.cancel()``
797 after catching an exception (raised by one of the awaitables) from
798 gather won't cancel any other awaitables.
799 """
800 if not coros_or_futures:
801 loop = events.get_event_loop()
802 outer = loop.create_future()
803 outer.set_result([])
804 return outer
806 loop = events._get_running_loop()
807 if loop is not None:
808 cur_task = current_task(loop)
809 else:
810 cur_task = None
812 def _done_callback(fut, cur_task=cur_task):
813 nonlocal nfinished
814 nfinished += 1
816 if cur_task is not None:
817 futures.future_discard_from_awaited_by(fut, cur_task)
819 if outer is None or outer.done():
820 if not fut.cancelled():
821 # Mark exception retrieved.
822 fut.exception()
823 return
825 if not return_exceptions:
826 if fut.cancelled():
827 # Check if 'fut' is cancelled first, as
828 # 'fut.exception()' will *raise* a CancelledError
829 # instead of returning it.
830 exc = fut._make_cancelled_error()
831 outer.set_exception(exc)
832 return
833 else:
834 exc = fut.exception()
835 if exc is not None:
836 outer.set_exception(exc)
837 return
839 if nfinished == nfuts:
840 # All futures are done; create a list of results
841 # and set it to the 'outer' future.
842 results = []
844 for fut in children:
845 if fut.cancelled():
846 # Check if 'fut' is cancelled first, as 'fut.exception()'
847 # will *raise* a CancelledError instead of returning it.
848 # Also, since we're adding the exception return value
849 # to 'results' instead of raising it, don't bother
850 # setting __context__. This also lets us preserve
851 # calling '_make_cancelled_error()' at most once.
852 res = exceptions.CancelledError(
853 '' if fut._cancel_message is None else
854 fut._cancel_message)
855 else:
856 res = fut.exception()
857 if res is None:
858 res = fut.result()
859 results.append(res)
861 if outer._cancel_requested:
862 # If gather is being cancelled we must propagate the
863 # cancellation regardless of *return_exceptions* argument.
864 # See issue 32684.
865 exc = fut._make_cancelled_error()
866 outer.set_exception(exc)
867 else:
868 outer.set_result(results)
870 arg_to_fut = {}
871 children = []
872 nfuts = 0
873 nfinished = 0
874 done_futs = []
875 outer = None # bpo-46672
876 for arg in coros_or_futures:
877 if arg not in arg_to_fut:
878 fut = ensure_future(arg, loop=loop)
879 if loop is None:
880 loop = futures._get_loop(fut)
881 if fut is not arg:
882 # 'arg' was not a Future, therefore, 'fut' is a new
883 # Future created specifically for 'arg'. Since the caller
884 # can't control it, disable the "destroy pending task"
885 # warning.
886 fut._log_destroy_pending = False
887 nfuts += 1
888 arg_to_fut[arg] = fut
889 if fut.done():
890 done_futs.append(fut)
891 else:
892 if cur_task is not None:
893 futures.future_add_to_awaited_by(fut, cur_task)
894 fut.add_done_callback(_done_callback)
896 else:
897 # There's a duplicate Future object in coros_or_futures.
898 fut = arg_to_fut[arg]
900 children.append(fut)
902 outer = _GatheringFuture(children, loop=loop)
903 # Run done callbacks after GatheringFuture created so any post-processing
904 # can be performed at this point
905 # optimization: in the special case that *all* futures finished eagerly,
906 # this will effectively complete the gather eagerly, with the last
907 # callback setting the result (or exception) on outer before returning it
908 for fut in done_futs:
909 _done_callback(fut)
910 return outer
913def _log_on_exception(fut):
914 if fut.cancelled(): 914 ↛ 915line 914 didn't jump to line 915 because the condition on line 914 was never true
915 return
917 exc = fut.exception()
918 if exc is None:
919 return
921 context = {
922 'message':
923 f'{exc.__class__.__name__} exception in shielded future',
924 'exception': exc,
925 'future': fut,
926 }
927 if fut._source_traceback: 927 ↛ 928line 927 didn't jump to line 928 because the condition on line 927 was never true
928 context['source_traceback'] = fut._source_traceback
929 fut._loop.call_exception_handler(context)
932def shield(arg):
933 """Wait for a future, shielding it from cancellation.
935 The statement
937 task = asyncio.create_task(something())
938 res = await shield(task)
940 is exactly equivalent to the statement
942 res = await something()
944 *except* that if the coroutine containing it is cancelled, the
945 task running in something() is not cancelled. From the POV of
946 something(), the cancellation did not happen. But its caller is
947 still cancelled, so the yield-from expression still raises
948 CancelledError. Note: If something() is cancelled by other means
949 this will still cancel shield().
951 If you want to completely ignore cancellation (not recommended)
952 you can combine shield() with a try/except clause, as follows:
954 task = asyncio.create_task(something())
955 try:
956 res = await shield(task)
957 except CancelledError:
958 res = None
960 Save a reference to tasks passed to this function, to avoid
961 a task disappearing mid-execution. The event loop only keeps
962 weak references to tasks. A task that isn't referenced elsewhere
963 may get garbage collected at any time, even before it's done.
964 """
965 inner = ensure_future(arg)
966 if inner.done():
967 # Shortcut.
968 return inner
969 loop = futures._get_loop(inner)
970 outer = loop.create_future()
972 if loop is not None and (cur_task := current_task(loop)) is not None:
973 futures.future_add_to_awaited_by(inner, cur_task)
974 else:
975 cur_task = None
977 def _clear_awaited_by_callback(inner):
978 futures.future_discard_from_awaited_by(inner, cur_task)
980 def _inner_done_callback(inner):
981 if outer.cancelled(): 981 ↛ 982line 981 didn't jump to line 982 because the condition on line 981 was never true
982 return
984 if inner.cancelled():
985 outer.cancel()
986 else:
987 exc = inner.exception()
988 if exc is not None:
989 outer.set_exception(exc)
990 else:
991 outer.set_result(inner.result())
993 def _outer_done_callback(outer):
994 if not inner.done():
995 inner.remove_done_callback(_inner_done_callback)
996 # Keep only one callback to log on cancel
997 inner.remove_done_callback(_log_on_exception)
998 inner.add_done_callback(_log_on_exception)
1000 if cur_task is not None:
1001 inner.add_done_callback(_clear_awaited_by_callback)
1004 inner.add_done_callback(_inner_done_callback)
1005 outer.add_done_callback(_outer_done_callback)
1006 return outer
1009def run_coroutine_threadsafe(coro, loop):
1010 """Submit a coroutine object to a given event loop.
1012 Return a concurrent.futures.Future to access the result.
1013 """
1014 if not coroutines.iscoroutine(coro): 1014 ↛ 1015line 1014 didn't jump to line 1015 because the condition on line 1014 was never true
1015 raise TypeError('A coroutine object is required')
1016 future = concurrent.futures.Future()
1018 def callback():
1019 try:
1020 futures._chain_future(ensure_future(coro, loop=loop), future)
1021 except (SystemExit, KeyboardInterrupt):
1022 raise
1023 except BaseException as exc:
1024 if future.set_running_or_notify_cancel(): 1024 ↛ 1026line 1024 didn't jump to line 1026 because the condition on line 1024 was always true
1025 future.set_exception(exc)
1026 raise
1028 loop.call_soon_threadsafe(callback)
1029 return future
1032def create_eager_task_factory(custom_task_constructor):
1033 """Create a function suitable for use as a task factory on an event-loop.
1035 Example usage:
1037 loop.set_task_factory(
1038 asyncio.create_eager_task_factory(my_task_constructor))
1040 Now, tasks created will be started immediately (rather than being first
1041 scheduled to an event loop). The constructor argument can be any
1042 callable that returns a Task-compatible object and has a signature
1043 compatible with `Task.__init__`; it must have the `eager_start`
1044 keyword argument.
1046 Most applications will use `Task` for `custom_task_constructor` and in
1047 this case there's no need to call `create_eager_task_factory()`
1048 directly. Instead the global `eager_task_factory` instance can be
1049 used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`.
1050 """
1052 def factory(loop, coro, *, eager_start=True, **kwargs):
1053 return custom_task_constructor(
1054 coro, loop=loop, eager_start=eager_start, **kwargs)
1056 return factory
1059eager_task_factory = create_eager_task_factory(Task)
1062# Collectively these two sets hold references to the complete set of active
1063# tasks. Eagerly executed tasks use a faster regular set as an optimization
1064# but may graduate to a WeakSet if the task blocks on IO.
1065_scheduled_tasks = weakref.WeakSet()
1066_eager_tasks = set()
1068# Dictionary containing tasks that are currently active in
1069# all running event loops. {EventLoop: Task}
1070_current_tasks = {}
1073def _register_task(task):
1074 """Register an asyncio Task scheduled to run on an event loop."""
1075 _scheduled_tasks.add(task)
1078def _register_eager_task(task):
1079 """Register an asyncio Task about to be eagerly executed."""
1080 _eager_tasks.add(task)
1083def _enter_task(loop, task):
1084 current_task = _current_tasks.get(loop)
1085 if current_task is not None:
1086 raise RuntimeError(f"Cannot enter into task {task!r} while another "
1087 f"task {current_task!r} is being executed.")
1088 _current_tasks[loop] = task
1091def _leave_task(loop, task):
1092 current_task = _current_tasks.get(loop)
1093 if current_task is not task:
1094 raise RuntimeError(f"Leaving task {task!r} does not match "
1095 f"the current task {current_task!r}.")
1096 del _current_tasks[loop]
1099def _swap_current_task(loop, task):
1100 prev_task = _current_tasks.get(loop)
1101 if task is None:
1102 del _current_tasks[loop]
1103 else:
1104 _current_tasks[loop] = task
1105 return prev_task
1108def _unregister_task(task):
1109 """Unregister a completed, scheduled Task."""
1110 _scheduled_tasks.discard(task)
1113def _unregister_eager_task(task):
1114 """Unregister a task which finished its first eager step."""
1115 _eager_tasks.discard(task)
1118_py_current_task = current_task
1119_py_register_task = _register_task
1120_py_register_eager_task = _register_eager_task
1121_py_unregister_task = _unregister_task
1122_py_unregister_eager_task = _unregister_eager_task
1123_py_enter_task = _enter_task
1124_py_leave_task = _leave_task
1125_py_swap_current_task = _swap_current_task
1126_py_all_tasks = all_tasks
1128try:
1129 from _asyncio import (_register_task, _register_eager_task,
1130 _unregister_task, _unregister_eager_task,
1131 _enter_task, _leave_task, _swap_current_task,
1132 current_task, all_tasks)
1133except ImportError:
1134 pass
1135else:
1136 _c_current_task = current_task
1137 _c_register_task = _register_task
1138 _c_register_eager_task = _register_eager_task
1139 _c_unregister_task = _unregister_task
1140 _c_unregister_eager_task = _unregister_eager_task
1141 _c_enter_task = _enter_task
1142 _c_leave_task = _leave_task
1143 _c_swap_current_task = _swap_current_task
1144 _c_all_tasks = all_tasks