Coverage for Lib/asyncio/tasks.py: 95%
530 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-23 01:21 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-23 01:21 +0000
1"""Support for tasks, coroutines and the scheduler."""
3__all__ = (
4 'Task', 'create_task',
5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep',
7 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
8 'current_task', 'all_tasks',
9 'create_eager_task_factory', 'eager_task_factory',
10 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
11)
13import concurrent.futures
14import contextvars
15import functools
16import inspect
17import itertools
18import math
19import types
20import weakref
21from types import GenericAlias
23from . import base_tasks
24from . import coroutines
25from . import events
26from . import exceptions
27from . import futures
28from . import queues
29from . import timeouts
31# Helper to generate new task names
32# This uses itertools.count() instead of a "+= 1" operation because the latter
33# is not thread safe. See bpo-11866 for a longer explanation.
34_task_name_counter = itertools.count(1).__next__
37def current_task(loop=None):
38 """Return a currently executed task."""
39 if loop is None:
40 loop = events.get_running_loop()
41 return _current_tasks.get(loop)
44def all_tasks(loop=None):
45 """Return a set of all tasks for the loop."""
46 if loop is None:
47 loop = events.get_running_loop()
48 # capturing the set of eager tasks first, so if an eager task "graduates"
49 # to a regular task in another thread, we don't risk missing it.
50 eager_tasks = list(_eager_tasks)
52 return {t for t in itertools.chain(_scheduled_tasks, eager_tasks)
53 if futures._get_loop(t) is loop and not t.done()}
56class Task(futures._PyFuture): # Inherit Python Task implementation
57 # from a Python Future implementation.
59 """A coroutine wrapped in a Future."""
61 # An important invariant maintained while a Task not done:
62 # _fut_waiter is either None or a Future. The Future
63 # can be either done() or not done().
64 # The task can be in any of 3 states:
65 #
66 # - 1: _fut_waiter is not None and not _fut_waiter.done():
67 # __step() is *not* scheduled and the Task is waiting for _fut_waiter.
68 # - 2: (_fut_waiter is None or _fut_waiter.done()) and __step() is scheduled:
69 # the Task is waiting for __step() to be executed.
70 # - 3: _fut_waiter is None and __step() is *not* scheduled:
71 # the Task is currently executing (in __step()).
72 #
73 # * In state 1, one of the callbacks of __fut_waiter must be __wakeup().
74 # * The transition from 1 to 2 happens when _fut_waiter becomes done(),
75 # as it schedules __wakeup() to be called (which calls __step() so
76 # we way that __step() is scheduled).
77 # * It transitions from 2 to 3 when __step() is executed, and it clears
78 # _fut_waiter to None.
80 # If False, don't log a message if the task is destroyed while its
81 # status is still pending
82 _log_destroy_pending = True
84 def __init__(self, coro, *, loop=None, name=None, context=None,
85 eager_start=False):
86 super().__init__(loop=loop)
87 if self._source_traceback:
88 del self._source_traceback[-1]
89 if not coroutines.iscoroutine(coro):
90 # raise after Future.__init__(), attrs are required for __del__
91 # prevent logging for pending task in __del__
92 self._log_destroy_pending = False
93 raise TypeError(f"a coroutine was expected, got {coro!r}")
95 if name is None:
96 self._name = f'Task-{_task_name_counter()}'
97 else:
98 self._name = str(name)
100 self._num_cancels_requested = 0
101 self._must_cancel = False
102 self._fut_waiter = None
103 self._coro = coro
104 if context is None:
105 self._context = contextvars.copy_context()
106 else:
107 self._context = context
109 if eager_start and self._loop.is_running():
110 self.__eager_start()
111 else:
112 self._loop.call_soon(self.__step, context=self._context)
113 _py_register_task(self)
115 def __del__(self):
116 if self._state == futures._PENDING and self._log_destroy_pending:
117 context = {
118 'task': self,
119 'message': 'Task was destroyed but it is pending!',
120 }
121 if self._source_traceback:
122 context['source_traceback'] = self._source_traceback
123 self._loop.call_exception_handler(context)
124 super().__del__()
126 __class_getitem__ = classmethod(GenericAlias)
128 def __repr__(self):
129 return base_tasks._task_repr(self)
131 def get_coro(self):
132 return self._coro
134 def get_context(self):
135 return self._context
137 def get_name(self):
138 return self._name
140 def set_name(self, value):
141 self._name = str(value)
143 def set_result(self, result):
144 raise RuntimeError('Task does not support set_result operation')
146 def set_exception(self, exception):
147 raise RuntimeError('Task does not support set_exception operation')
149 def get_stack(self, *, limit=None):
150 """Return the list of stack frames for this task's coroutine.
152 If the coroutine is not done, this returns the stack where it is
153 suspended. If the coroutine has completed successfully or was
154 cancelled, this returns an empty list. If the coroutine was
155 terminated by an exception, this returns the list of traceback
156 frames.
158 The frames are always ordered from oldest to newest.
160 The optional limit gives the maximum number of frames to
161 return; by default all available frames are returned. Its
162 meaning differs depending on whether a stack or a traceback is
163 returned: the newest frames of a stack are returned, but the
164 oldest frames of a traceback are returned. (This matches the
165 behavior of the traceback module.)
167 For reasons beyond our control, only one stack frame is
168 returned for a suspended coroutine.
169 """
170 return base_tasks._task_get_stack(self, limit)
172 def print_stack(self, *, limit=None, file=None):
173 """Print the stack or traceback for this task's coroutine.
175 This produces output similar to that of the traceback module,
176 for the frames retrieved by get_stack(). The limit argument
177 is passed to get_stack(). The file argument is an I/O stream
178 to which the output is written; by default output is written
179 to sys.stderr.
180 """
181 return base_tasks._task_print_stack(self, limit, file)
183 def cancel(self, msg=None):
184 """Request that this task cancel itself.
186 This arranges for a CancelledError to be thrown into the
187 wrapped coroutine on the next cycle through the event loop.
188 The coroutine then has a chance to clean up or even deny
189 the request using try/except/finally.
191 Unlike Future.cancel, this does not guarantee that the
192 task will be cancelled: the exception might be caught and
193 acted upon, delaying cancellation of the task or preventing
194 cancellation completely. The task may also return a value or
195 raise a different exception.
197 Immediately after this method is called, Task.cancelled() will
198 not return True (unless the task was already cancelled). A
199 task will be marked as cancelled when the wrapped coroutine
200 terminates with a CancelledError exception (even if cancel()
201 was not called).
203 This also increases the task's count of cancellation requests.
204 """
205 self._log_traceback = False
206 if self.done():
207 return False
208 self._num_cancels_requested += 1
209 # These two lines are controversial. See discussion starting at
210 # https://github.com/python/cpython/pull/31394#issuecomment-1053545331
211 # Also remember that this is duplicated in _asynciomodule.c.
212 # if self._num_cancels_requested > 1:
213 # return False
214 if self._fut_waiter is not None:
215 if self._fut_waiter.cancel(msg=msg):
216 # Leave self._fut_waiter; it may be a Task that
217 # catches and ignores the cancellation so we may have
218 # to cancel it again later.
219 return True
220 # It must be the case that self.__step is already scheduled.
221 self._must_cancel = True
222 self._cancel_message = msg
223 return True
225 def cancelling(self):
226 """Return the count of the task's cancellation requests.
228 This count is incremented when .cancel() is called
229 and may be decremented using .uncancel().
230 """
231 return self._num_cancels_requested
233 def uncancel(self):
234 """Decrement the task's count of cancellation requests.
236 This should be called by the party that called `cancel()` on the task
237 beforehand.
239 Returns the remaining number of cancellation requests.
240 """
241 if self._num_cancels_requested > 0: 241 ↛ 245line 241 didn't jump to line 245 because the condition on line 241 was always true
242 self._num_cancels_requested -= 1
243 if self._num_cancels_requested == 0: 243 ↛ 245line 243 didn't jump to line 245 because the condition on line 243 was always true
244 self._must_cancel = False
245 return self._num_cancels_requested
247 def __eager_start(self):
248 prev_task = _py_swap_current_task(self._loop, self)
249 try:
250 _py_register_eager_task(self)
251 try:
252 self._context.run(self.__step_run_and_handle_result, None)
253 finally:
254 _py_unregister_eager_task(self)
255 finally:
256 try:
257 curtask = _py_swap_current_task(self._loop, prev_task)
258 assert curtask is self
259 finally:
260 if self.done():
261 self._coro = None
262 self = None # Needed to break cycles when an exception occurs.
263 else:
264 _py_register_task(self)
266 def __step(self, exc=None):
267 if self.done():
268 raise exceptions.InvalidStateError(
269 f'__step(): already done: {self!r}, {exc!r}')
270 if self._must_cancel:
271 if not isinstance(exc, exceptions.CancelledError):
272 exc = self._make_cancelled_error()
273 self._must_cancel = False
274 self._fut_waiter = None
276 _py_enter_task(self._loop, self)
277 try:
278 self.__step_run_and_handle_result(exc)
279 finally:
280 _py_leave_task(self._loop, self)
281 self = None # Needed to break cycles when an exception occurs.
283 def __step_run_and_handle_result(self, exc):
284 coro = self._coro
285 try:
286 if exc is None:
287 # We use the `send` method directly, because coroutines
288 # don't have `__iter__` and `__next__` methods.
289 result = coro.send(None)
290 else:
291 result = coro.throw(exc)
292 except StopIteration as exc:
293 if self._must_cancel:
294 # Task is cancelled right before coro stops.
295 self._must_cancel = False
296 super().cancel(msg=self._cancel_message)
297 else:
298 super().set_result(exc.value)
299 except exceptions.CancelledError as exc:
300 # Save the original exception so we can chain it later.
301 self._cancelled_exc = exc
302 super().cancel() # I.e., Future.cancel(self).
303 except (KeyboardInterrupt, SystemExit) as exc:
304 super().set_exception(exc)
305 raise
306 except BaseException as exc:
307 super().set_exception(exc)
308 else:
309 blocking = getattr(result, '_asyncio_future_blocking', None)
310 if blocking is not None:
311 # Yielded Future must come from Future.__iter__().
312 if futures._get_loop(result) is not self._loop:
313 new_exc = RuntimeError(
314 f'Task {self!r} got Future '
315 f'{result!r} attached to a different loop')
316 self._loop.call_soon(
317 self.__step, new_exc, context=self._context)
318 elif blocking: 318 ↛ 335line 318 didn't jump to line 335 because the condition on line 318 was always true
319 if result is self:
320 new_exc = RuntimeError(
321 f'Task cannot await on itself: {self!r}')
322 self._loop.call_soon(
323 self.__step, new_exc, context=self._context)
324 else:
325 futures.future_add_to_awaited_by(result, self)
326 result._asyncio_future_blocking = False
327 result.add_done_callback(
328 self.__wakeup, context=self._context)
329 self._fut_waiter = result
330 if self._must_cancel:
331 if self._fut_waiter.cancel( 331 ↛ 357line 331 didn't jump to line 357 because the condition on line 331 was always true
332 msg=self._cancel_message):
333 self._must_cancel = False
334 else:
335 new_exc = RuntimeError(
336 f'yield was used instead of yield from '
337 f'in task {self!r} with {result!r}')
338 self._loop.call_soon(
339 self.__step, new_exc, context=self._context)
341 elif result is None: 341 ↛ 344line 341 didn't jump to line 344 because the condition on line 341 was always true
342 # Bare yield relinquishes control for one event loop iteration.
343 self._loop.call_soon(self.__step, context=self._context)
344 elif inspect.isgenerator(result):
345 # Yielding a generator is just wrong.
346 new_exc = RuntimeError(
347 f'yield was used instead of yield from for '
348 f'generator in task {self!r} with {result!r}')
349 self._loop.call_soon(
350 self.__step, new_exc, context=self._context)
351 else:
352 # Yielding something else is an error.
353 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
354 self._loop.call_soon(
355 self.__step, new_exc, context=self._context)
356 finally:
357 self = None # Needed to break cycles when an exception occurs.
359 def __wakeup(self, future):
360 futures.future_discard_from_awaited_by(future, self)
361 try:
362 future.result()
363 except BaseException as exc:
364 # This may also be a cancellation.
365 self.__step(exc)
366 else:
367 # Don't pass the value of `future.result()` explicitly,
368 # as `Future.__iter__` and `Future.__await__` don't need it.
369 # If we call `__step(value, None)` instead of `__step()`,
370 # Python eval loop would use `.send(value)` method call,
371 # instead of `__next__()`, which is slower for futures
372 # that return non-generator iterators from their `__iter__`.
373 self.__step()
374 self = None # Needed to break cycles when an exception occurs.
377_PyTask = Task
380try:
381 import _asyncio
382except ImportError:
383 pass
384else:
385 # _CTask is needed for tests.
386 Task = _CTask = _asyncio.Task
389def create_task(coro, **kwargs):
390 """Schedule the execution of a coroutine object in a spawn task.
392 Return a Task object.
393 """
394 loop = events.get_running_loop()
395 return loop.create_task(coro, **kwargs)
398# wait() and as_completed() similar to those in PEP 3148.
400FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
401FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
402ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
405async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
406 """Wait for the Futures or Tasks given by fs to complete.
408 The fs iterable must not be empty.
410 Returns two sets of Future: (done, pending).
412 Usage:
414 done, pending = await asyncio.wait(fs)
416 Note: This does not raise TimeoutError! Futures that aren't done
417 when the timeout occurs are returned in the second set.
418 """
419 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
420 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
421 if not fs:
422 raise ValueError('Set of Tasks/Futures is empty.')
423 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
424 raise ValueError(f'Invalid return_when value: {return_when}')
426 fs = set(fs)
428 if any(coroutines.iscoroutine(f) for f in fs): 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true
429 raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
431 loop = events.get_running_loop()
432 return await _wait(fs, timeout, return_when, loop)
435def _release_waiter(waiter, *args):
436 if not waiter.done(): 436 ↛ exitline 436 didn't return from function '_release_waiter' because the condition on line 436 was always true
437 waiter.set_result(None)
440async def wait_for(fut, timeout):
441 """Wait for the single Future or coroutine to complete, with timeout.
443 Coroutine will be wrapped in Task.
445 Returns result of the Future or coroutine. When a timeout occurs,
446 it cancels the task and raises TimeoutError. To avoid the task
447 cancellation, wrap it in shield().
449 If the wait is cancelled, the task is also cancelled.
451 If the task suppresses the cancellation and returns a value instead,
452 that value is returned.
454 This function is a coroutine.
455 """
456 # The special case for timeout <= 0 is for the following case:
457 #
458 # async def test_waitfor():
459 # func_started = False
460 #
461 # async def func():
462 # nonlocal func_started
463 # func_started = True
464 #
465 # try:
466 # await asyncio.wait_for(func(), 0)
467 # except asyncio.TimeoutError:
468 # assert not func_started
469 # else:
470 # assert False
471 #
472 # asyncio.run(test_waitfor())
475 if timeout is not None and timeout <= 0:
476 fut = ensure_future(fut)
478 if fut.done():
479 return fut.result()
481 await _cancel_and_wait(fut)
482 try:
483 return fut.result()
484 except exceptions.CancelledError as exc:
485 raise TimeoutError from exc
487 async with timeouts.timeout(timeout):
488 return await fut
490async def _wait(fs, timeout, return_when, loop):
491 """Internal helper for wait().
493 The fs argument must be a collection of Futures.
494 """
495 assert fs, 'Set of Futures is empty.'
496 waiter = loop.create_future()
497 timeout_handle = None
498 if timeout is not None:
499 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
500 counter = len(fs)
501 cur_task = current_task()
503 def _on_completion(f):
504 nonlocal counter
505 counter -= 1
506 if (counter <= 0 or
507 return_when == FIRST_COMPLETED or
508 return_when == FIRST_EXCEPTION and (not f.cancelled() and
509 f.exception() is not None)):
510 if timeout_handle is not None: 510 ↛ 511line 510 didn't jump to line 511 because the condition on line 510 was never true
511 timeout_handle.cancel()
512 if not waiter.done():
513 waiter.set_result(None)
514 futures.future_discard_from_awaited_by(f, cur_task)
516 for f in fs:
517 f.add_done_callback(_on_completion)
518 futures.future_add_to_awaited_by(f, cur_task)
520 try:
521 await waiter
522 finally:
523 if timeout_handle is not None:
524 timeout_handle.cancel()
525 for f in fs:
526 f.remove_done_callback(_on_completion)
528 done, pending = set(), set()
529 for f in fs:
530 if f.done():
531 done.add(f)
532 else:
533 pending.add(f)
534 return done, pending
537async def _cancel_and_wait(fut):
538 """Cancel the *fut* future or task and wait until it completes."""
540 loop = events.get_running_loop()
541 waiter = loop.create_future()
542 cb = functools.partial(_release_waiter, waiter)
543 fut.add_done_callback(cb)
545 try:
546 fut.cancel()
547 # We cannot wait on *fut* directly to make
548 # sure _cancel_and_wait itself is reliably cancellable.
549 await waiter
550 finally:
551 fut.remove_done_callback(cb)
554class _AsCompletedIterator:
555 """Iterator of awaitables representing tasks of asyncio.as_completed.
557 As an asynchronous iterator, iteration yields futures as they finish. As a
558 plain iterator, new coroutines are yielded that will return or raise the
559 result of the next underlying future to complete.
560 """
561 def __init__(self, aws, timeout):
562 self._done = queues.Queue()
563 self._timeout_handle = None
565 loop = events.get_event_loop()
566 todo = {ensure_future(aw, loop=loop) for aw in set(aws)}
567 for f in todo:
568 f.add_done_callback(self._handle_completion)
569 if todo and timeout is not None:
570 self._timeout_handle = (
571 loop.call_later(timeout, self._handle_timeout)
572 )
573 self._todo = todo
574 self._todo_left = len(todo)
576 def __aiter__(self):
577 return self
579 def __iter__(self):
580 return self
582 async def __anext__(self):
583 if not self._todo_left:
584 raise StopAsyncIteration
585 assert self._todo_left > 0
586 self._todo_left -= 1
587 return await self._wait_for_one()
589 def __next__(self):
590 if not self._todo_left:
591 raise StopIteration
592 assert self._todo_left > 0
593 self._todo_left -= 1
594 return self._wait_for_one(resolve=True)
596 def _handle_timeout(self):
597 for f in self._todo:
598 f.remove_done_callback(self._handle_completion)
599 self._done.put_nowait(None) # Sentinel for _wait_for_one().
600 self._todo.clear() # Can't do todo.remove(f) in the loop.
602 def _handle_completion(self, f):
603 if not self._todo: 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true
604 return # _handle_timeout() was here first.
605 self._todo.remove(f)
606 self._done.put_nowait(f)
607 if not self._todo and self._timeout_handle is not None:
608 self._timeout_handle.cancel()
610 async def _wait_for_one(self, resolve=False):
611 # Wait for the next future to be done and return it unless resolve is
612 # set, in which case return either the result of the future or raise
613 # an exception.
614 f = await self._done.get()
615 if f is None:
616 # Dummy value from _handle_timeout().
617 raise exceptions.TimeoutError
618 return f.result() if resolve else f
621def as_completed(fs, *, timeout=None):
622 """Create an iterator of awaitables or their results in completion order.
624 Run the supplied awaitables concurrently. The returned object can be
625 iterated to obtain the results of the awaitables as they finish.
627 The object returned can be iterated as an asynchronous iterator or a plain
628 iterator. When asynchronous iteration is used, the originally-supplied
629 awaitables are yielded if they are tasks or futures. This makes it easy to
630 correlate previously-scheduled tasks with their results:
632 ipv4_connect = create_task(open_connection("127.0.0.1", 80))
633 ipv6_connect = create_task(open_connection("::1", 80))
634 tasks = [ipv4_connect, ipv6_connect]
636 async for earliest_connect in as_completed(tasks):
637 # earliest_connect is done. The result can be obtained by
638 # awaiting it or calling earliest_connect.result()
639 reader, writer = await earliest_connect
641 if earliest_connect is ipv6_connect:
642 print("IPv6 connection established.")
643 else:
644 print("IPv4 connection established.")
646 During asynchronous iteration, implicitly-created tasks will be yielded for
647 supplied awaitables that aren't tasks or futures.
649 When used as a plain iterator, each iteration yields a new coroutine that
650 returns the result or raises the exception of the next completed awaitable.
651 This pattern is compatible with Python versions older than 3.13:
653 ipv4_connect = create_task(open_connection("127.0.0.1", 80))
654 ipv6_connect = create_task(open_connection("::1", 80))
655 tasks = [ipv4_connect, ipv6_connect]
657 for next_connect in as_completed(tasks):
658 # next_connect is not one of the original task objects. It must be
659 # awaited to obtain the result value or raise the exception of the
660 # awaitable that finishes next.
661 reader, writer = await next_connect
663 A TimeoutError is raised if the timeout occurs before all awaitables are
664 done. This is raised by the async for loop during asynchronous iteration or
665 by the coroutines yielded during plain iteration.
666 """
667 if inspect.isawaitable(fs):
668 raise TypeError(
669 f"expects an iterable of awaitables, not {type(fs).__name__}"
670 )
672 return _AsCompletedIterator(fs, timeout)
675@types.coroutine
676def __sleep0():
677 """Skip one event loop run cycle.
679 This is a private helper for 'asyncio.sleep()', used
680 when the 'delay' is set to 0. It uses a bare 'yield'
681 expression (which Task.__step knows how to handle)
682 instead of creating a Future object.
683 """
684 yield
687async def sleep(delay, result=None):
688 """Coroutine that completes after a given time (in seconds)."""
689 if delay <= 0:
690 await __sleep0()
691 return result
693 if math.isnan(delay):
694 raise ValueError("Invalid delay: NaN (not a number)")
696 loop = events.get_running_loop()
697 future = loop.create_future()
698 h = loop.call_later(delay,
699 futures._set_result_unless_cancelled,
700 future, result)
701 try:
702 return await future
703 finally:
704 h.cancel()
707def ensure_future(coro_or_future, *, loop=None):
708 """Wrap a coroutine or an awaitable in a future.
710 If the argument is a Future, it is returned directly.
711 """
712 if futures.isfuture(coro_or_future):
713 if loop is not None and loop is not futures._get_loop(coro_or_future):
714 raise ValueError('The future belongs to a different loop than '
715 'the one specified as the loop argument')
716 return coro_or_future
717 should_close = True
718 if not coroutines.iscoroutine(coro_or_future):
719 if inspect.isawaitable(coro_or_future):
720 async def _wrap_awaitable(awaitable):
721 return await awaitable
723 coro_or_future = _wrap_awaitable(coro_or_future)
724 should_close = False
725 else:
726 raise TypeError('An asyncio.Future, a coroutine or an awaitable '
727 'is required')
729 if loop is None:
730 loop = events.get_event_loop()
731 try:
732 return loop.create_task(coro_or_future)
733 except RuntimeError:
734 if should_close: 734 ↛ 736line 734 didn't jump to line 736 because the condition on line 734 was always true
735 coro_or_future.close()
736 raise
739class _GatheringFuture(futures.Future):
740 """Helper for gather().
742 This overrides cancel() to cancel all the children and act more
743 like Task.cancel(), which doesn't immediately mark itself as
744 cancelled.
745 """
747 def __init__(self, children, *, loop):
748 assert loop is not None
749 super().__init__(loop=loop)
750 self._children = children
751 self._cancel_requested = False
753 def cancel(self, msg=None):
754 if self.done():
755 return False
756 ret = False
757 for child in self._children:
758 if child.cancel(msg=msg):
759 ret = True
760 if ret:
761 # If any child tasks were actually cancelled, we should
762 # propagate the cancellation request regardless of
763 # *return_exceptions* argument. See issue 32684.
764 self._cancel_requested = True
765 return ret
768def gather(*coros_or_futures, return_exceptions=False):
769 """Return a future aggregating results from the given coroutines/futures.
771 Coroutines will be wrapped in a future and scheduled in the event
772 loop. They will not necessarily be scheduled in the same order as
773 passed in.
775 All futures must share the same event loop. If all the tasks are
776 done successfully, the returned future's result is the list of
777 results (in the order of the original sequence, not necessarily
778 the order of results arrival). If *return_exceptions* is True,
779 exceptions in the tasks are treated the same as successful
780 results, and gathered in the result list; otherwise, the first
781 raised exception will be immediately propagated to the returned
782 future.
784 Cancellation: if the outer Future is cancelled, all children (that
785 have not completed yet) are also cancelled. If any child is
786 cancelled, this is treated as if it raised CancelledError --
787 the outer Future is *not* cancelled in this case. (This is to
788 prevent the cancellation of one child to cause other children to
789 be cancelled.)
791 If *return_exceptions* is False, cancelling gather() after it
792 has been marked done won't cancel any submitted awaitables.
793 For instance, gather can be marked done after propagating an
794 exception to the caller, therefore, calling ``gather.cancel()``
795 after catching an exception (raised by one of the awaitables) from
796 gather won't cancel any other awaitables.
797 """
798 if not coros_or_futures:
799 loop = events.get_event_loop()
800 outer = loop.create_future()
801 outer.set_result([])
802 return outer
804 loop = events._get_running_loop()
805 if loop is not None:
806 cur_task = current_task(loop)
807 else:
808 cur_task = None
810 def _done_callback(fut, cur_task=cur_task):
811 nonlocal nfinished
812 nfinished += 1
814 if cur_task is not None:
815 futures.future_discard_from_awaited_by(fut, cur_task)
817 if outer is None or outer.done():
818 if not fut.cancelled():
819 # Mark exception retrieved.
820 fut.exception()
821 return
823 if not return_exceptions:
824 if fut.cancelled():
825 # Check if 'fut' is cancelled first, as
826 # 'fut.exception()' will *raise* a CancelledError
827 # instead of returning it.
828 exc = fut._make_cancelled_error()
829 outer.set_exception(exc)
830 return
831 else:
832 exc = fut.exception()
833 if exc is not None:
834 outer.set_exception(exc)
835 return
837 if nfinished == nfuts:
838 # All futures are done; create a list of results
839 # and set it to the 'outer' future.
840 results = []
842 for fut in children:
843 if fut.cancelled():
844 # Check if 'fut' is cancelled first, as 'fut.exception()'
845 # will *raise* a CancelledError instead of returning it.
846 # Also, since we're adding the exception return value
847 # to 'results' instead of raising it, don't bother
848 # setting __context__. This also lets us preserve
849 # calling '_make_cancelled_error()' at most once.
850 res = exceptions.CancelledError(
851 '' if fut._cancel_message is None else
852 fut._cancel_message)
853 else:
854 res = fut.exception()
855 if res is None:
856 res = fut.result()
857 results.append(res)
859 if outer._cancel_requested:
860 # If gather is being cancelled we must propagate the
861 # cancellation regardless of *return_exceptions* argument.
862 # See issue 32684.
863 exc = fut._make_cancelled_error()
864 outer.set_exception(exc)
865 else:
866 outer.set_result(results)
868 arg_to_fut = {}
869 children = []
870 nfuts = 0
871 nfinished = 0
872 done_futs = []
873 outer = None # bpo-46672
874 for arg in coros_or_futures:
875 if arg not in arg_to_fut:
876 fut = ensure_future(arg, loop=loop)
877 if loop is None:
878 loop = futures._get_loop(fut)
879 if fut is not arg:
880 # 'arg' was not a Future, therefore, 'fut' is a new
881 # Future created specifically for 'arg'. Since the caller
882 # can't control it, disable the "destroy pending task"
883 # warning.
884 fut._log_destroy_pending = False
885 nfuts += 1
886 arg_to_fut[arg] = fut
887 if fut.done():
888 done_futs.append(fut)
889 else:
890 if cur_task is not None:
891 futures.future_add_to_awaited_by(fut, cur_task)
892 fut.add_done_callback(_done_callback)
894 else:
895 # There's a duplicate Future object in coros_or_futures.
896 fut = arg_to_fut[arg]
898 children.append(fut)
900 outer = _GatheringFuture(children, loop=loop)
901 # Run done callbacks after GatheringFuture created so any post-processing
902 # can be performed at this point
903 # optimization: in the special case that *all* futures finished eagerly,
904 # this will effectively complete the gather eagerly, with the last
905 # callback setting the result (or exception) on outer before returning it
906 for fut in done_futs:
907 _done_callback(fut)
908 return outer
911def _log_on_exception(fut):
912 if fut.cancelled(): 912 ↛ 913line 912 didn't jump to line 913 because the condition on line 912 was never true
913 return
915 exc = fut.exception()
916 if exc is None:
917 return
919 context = {
920 'message':
921 f'{exc.__class__.__name__} exception in shielded future',
922 'exception': exc,
923 'future': fut,
924 }
925 if fut._source_traceback: 925 ↛ 926line 925 didn't jump to line 926 because the condition on line 925 was never true
926 context['source_traceback'] = fut._source_traceback
927 fut._loop.call_exception_handler(context)
930def shield(arg):
931 """Wait for a future, shielding it from cancellation.
933 The statement
935 task = asyncio.create_task(something())
936 res = await shield(task)
938 is exactly equivalent to the statement
940 res = await something()
942 *except* that if the coroutine containing it is cancelled, the
943 task running in something() is not cancelled. From the POV of
944 something(), the cancellation did not happen. But its caller is
945 still cancelled, so the yield-from expression still raises
946 CancelledError. Note: If something() is cancelled by other means
947 this will still cancel shield().
949 If you want to completely ignore cancellation (not recommended)
950 you can combine shield() with a try/except clause, as follows:
952 task = asyncio.create_task(something())
953 try:
954 res = await shield(task)
955 except CancelledError:
956 res = None
958 Save a reference to tasks passed to this function, to avoid
959 a task disappearing mid-execution. The event loop only keeps
960 weak references to tasks. A task that isn't referenced elsewhere
961 may get garbage collected at any time, even before it's done.
962 """
963 inner = ensure_future(arg)
964 if inner.done():
965 # Shortcut.
966 return inner
967 loop = futures._get_loop(inner)
968 outer = loop.create_future()
970 if loop is not None and (cur_task := current_task(loop)) is not None:
971 futures.future_add_to_awaited_by(inner, cur_task)
972 else:
973 cur_task = None
975 def _clear_awaited_by_callback(inner):
976 futures.future_discard_from_awaited_by(inner, cur_task)
978 def _inner_done_callback(inner):
979 if outer.cancelled(): 979 ↛ 980line 979 didn't jump to line 980 because the condition on line 979 was never true
980 return
982 if inner.cancelled():
983 outer.cancel()
984 else:
985 exc = inner.exception()
986 if exc is not None:
987 outer.set_exception(exc)
988 else:
989 outer.set_result(inner.result())
991 def _outer_done_callback(outer):
992 if not inner.done():
993 inner.remove_done_callback(_inner_done_callback)
994 # Keep only one callback to log on cancel
995 inner.remove_done_callback(_log_on_exception)
996 inner.add_done_callback(_log_on_exception)
998 if cur_task is not None:
999 inner.add_done_callback(_clear_awaited_by_callback)
1002 inner.add_done_callback(_inner_done_callback)
1003 outer.add_done_callback(_outer_done_callback)
1004 return outer
1007def run_coroutine_threadsafe(coro, loop):
1008 """Submit a coroutine object to a given event loop.
1010 Return a concurrent.futures.Future to access the result.
1011 """
1012 if not coroutines.iscoroutine(coro): 1012 ↛ 1013line 1012 didn't jump to line 1013 because the condition on line 1012 was never true
1013 raise TypeError('A coroutine object is required')
1014 future = concurrent.futures.Future()
1016 def callback():
1017 try:
1018 futures._chain_future(ensure_future(coro, loop=loop), future)
1019 except (SystemExit, KeyboardInterrupt):
1020 raise
1021 except BaseException as exc:
1022 if future.set_running_or_notify_cancel(): 1022 ↛ 1024line 1022 didn't jump to line 1024 because the condition on line 1022 was always true
1023 future.set_exception(exc)
1024 raise
1026 loop.call_soon_threadsafe(callback)
1027 return future
1030def create_eager_task_factory(custom_task_constructor):
1031 """Create a function suitable for use as a task factory on an event-loop.
1033 Example usage:
1035 loop.set_task_factory(
1036 asyncio.create_eager_task_factory(my_task_constructor))
1038 Now, tasks created will be started immediately (rather than being first
1039 scheduled to an event loop). The constructor argument can be any callable
1040 that returns a Task-compatible object and has a signature compatible
1041 with `Task.__init__`; it must have the `eager_start` keyword argument.
1043 Most applications will use `Task` for `custom_task_constructor` and in
1044 this case there's no need to call `create_eager_task_factory()`
1045 directly. Instead the global `eager_task_factory` instance can be
1046 used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`.
1047 """
1049 def factory(loop, coro, *, eager_start=True, **kwargs):
1050 return custom_task_constructor(
1051 coro, loop=loop, eager_start=eager_start, **kwargs)
1053 return factory
1056eager_task_factory = create_eager_task_factory(Task)
1059# Collectively these two sets hold references to the complete set of active
1060# tasks. Eagerly executed tasks use a faster regular set as an optimization
1061# but may graduate to a WeakSet if the task blocks on IO.
1062_scheduled_tasks = weakref.WeakSet()
1063_eager_tasks = set()
1065# Dictionary containing tasks that are currently active in
1066# all running event loops. {EventLoop: Task}
1067_current_tasks = {}
1070def _register_task(task):
1071 """Register an asyncio Task scheduled to run on an event loop."""
1072 _scheduled_tasks.add(task)
1075def _register_eager_task(task):
1076 """Register an asyncio Task about to be eagerly executed."""
1077 _eager_tasks.add(task)
1080def _enter_task(loop, task):
1081 current_task = _current_tasks.get(loop)
1082 if current_task is not None:
1083 raise RuntimeError(f"Cannot enter into task {task!r} while another "
1084 f"task {current_task!r} is being executed.")
1085 _current_tasks[loop] = task
1088def _leave_task(loop, task):
1089 current_task = _current_tasks.get(loop)
1090 if current_task is not task:
1091 raise RuntimeError(f"Leaving task {task!r} does not match "
1092 f"the current task {current_task!r}.")
1093 del _current_tasks[loop]
1096def _swap_current_task(loop, task):
1097 prev_task = _current_tasks.get(loop)
1098 if task is None:
1099 del _current_tasks[loop]
1100 else:
1101 _current_tasks[loop] = task
1102 return prev_task
1105def _unregister_task(task):
1106 """Unregister a completed, scheduled Task."""
1107 _scheduled_tasks.discard(task)
1110def _unregister_eager_task(task):
1111 """Unregister a task which finished its first eager step."""
1112 _eager_tasks.discard(task)
1115_py_current_task = current_task
1116_py_register_task = _register_task
1117_py_register_eager_task = _register_eager_task
1118_py_unregister_task = _unregister_task
1119_py_unregister_eager_task = _unregister_eager_task
1120_py_enter_task = _enter_task
1121_py_leave_task = _leave_task
1122_py_swap_current_task = _swap_current_task
1123_py_all_tasks = all_tasks
1125try:
1126 from _asyncio import (_register_task, _register_eager_task,
1127 _unregister_task, _unregister_eager_task,
1128 _enter_task, _leave_task, _swap_current_task,
1129 current_task, all_tasks)
1130except ImportError:
1131 pass
1132else:
1133 _c_current_task = current_task
1134 _c_register_task = _register_task
1135 _c_register_eager_task = _register_eager_task
1136 _c_unregister_task = _unregister_task
1137 _c_unregister_eager_task = _unregister_eager_task
1138 _c_enter_task = _enter_task
1139 _c_leave_task = _leave_task
1140 _c_swap_current_task = _swap_current_task
1141 _c_all_tasks = all_tasks