Coverage for Lib/asyncio/tasks.py: 95%
521 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
1"""Support for tasks, coroutines and the scheduler."""
3__all__ = (
4 'Task', 'create_task',
5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep',
7 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
8 'current_task', 'all_tasks',
9 'create_eager_task_factory', 'eager_task_factory',
10 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
11)
13import concurrent.futures
14import contextvars
15import functools
16import inspect
17import itertools
18import math
19import types
20import weakref
21from types import GenericAlias
23from . import base_tasks
24from . import coroutines
25from . import events
26from . import exceptions
27from . import futures
28from . import queues
29from . import timeouts
31# Helper to generate new task names
32# This uses itertools.count() instead of a "+= 1" operation because the latter
33# is not thread safe. See bpo-11866 for a longer explanation.
34_task_name_counter = itertools.count(1).__next__
37def current_task(loop=None):
38 """Return a currently executed task."""
39 if loop is None:
40 loop = events.get_running_loop()
41 return _current_tasks.get(loop)
44def all_tasks(loop=None):
45 """Return a set of all tasks for the loop."""
46 if loop is None:
47 loop = events.get_running_loop()
48 # capturing the set of eager tasks first, so if an eager task "graduates"
49 # to a regular task in another thread, we don't risk missing it.
50 eager_tasks = list(_eager_tasks)
52 return {t for t in itertools.chain(_scheduled_tasks, eager_tasks)
53 if futures._get_loop(t) is loop and not t.done()}
56class Task(futures._PyFuture): # Inherit Python Task implementation
57 # from a Python Future implementation.
59 """A coroutine wrapped in a Future."""
61 # An important invariant maintained while a Task not done:
62 # _fut_waiter is either None or a Future. The Future
63 # can be either done() or not done().
64 # The task can be in any of 3 states:
65 #
66 # - 1: _fut_waiter is not None and not _fut_waiter.done():
67 # __step() is *not* scheduled and the Task is waiting for _fut_waiter.
68 # - 2: (_fut_waiter is None or _fut_waiter.done()) and __step() is scheduled:
69 # the Task is waiting for __step() to be executed.
70 # - 3: _fut_waiter is None and __step() is *not* scheduled:
71 # the Task is currently executing (in __step()).
72 #
73 # * In state 1, one of the callbacks of __fut_waiter must be __wakeup().
74 # * The transition from 1 to 2 happens when _fut_waiter becomes done(),
75 # as it schedules __wakeup() to be called (which calls __step() so
76 # we way that __step() is scheduled).
77 # * It transitions from 2 to 3 when __step() is executed, and it clears
78 # _fut_waiter to None.
80 # If False, don't log a message if the task is destroyed while its
81 # status is still pending
82 _log_destroy_pending = True
84 def __init__(self, coro, *, loop=None, name=None, context=None,
85 eager_start=False):
86 super().__init__(loop=loop)
87 if self._source_traceback:
88 del self._source_traceback[-1]
89 if not coroutines.iscoroutine(coro):
90 # raise after Future.__init__(), attrs are required for __del__
91 # prevent logging for pending task in __del__
92 self._log_destroy_pending = False
93 raise TypeError(f"a coroutine was expected, got {coro!r}")
95 if name is None:
96 self._name = f'Task-{_task_name_counter()}'
97 else:
98 self._name = str(name)
100 self._num_cancels_requested = 0
101 self._must_cancel = False
102 self._fut_waiter = None
103 self._coro = coro
104 if context is None:
105 self._context = contextvars.copy_context()
106 else:
107 self._context = context
109 if eager_start and self._loop.is_running():
110 self.__eager_start()
111 else:
112 self._loop.call_soon(self.__step, context=self._context)
113 _py_register_task(self)
115 def __del__(self):
116 if self._state == futures._PENDING and self._log_destroy_pending:
117 context = {
118 'task': self,
119 'message': 'Task was destroyed but it is pending!',
120 }
121 if self._source_traceback:
122 context['source_traceback'] = self._source_traceback
123 self._loop.call_exception_handler(context)
124 super().__del__()
126 __class_getitem__ = classmethod(GenericAlias)
128 def __repr__(self):
129 return base_tasks._task_repr(self)
131 def get_coro(self):
132 return self._coro
134 def get_context(self):
135 return self._context
137 def get_name(self):
138 return self._name
140 def set_name(self, value):
141 self._name = str(value)
143 def set_result(self, result):
144 raise RuntimeError('Task does not support set_result operation')
146 def set_exception(self, exception):
147 raise RuntimeError('Task does not support set_exception operation')
149 def get_stack(self, *, limit=None):
150 """Return the list of stack frames for this task's coroutine.
152 If the coroutine is not done, this returns the stack where it is
153 suspended. If the coroutine has completed successfully or was
154 cancelled, this returns an empty list. If the coroutine was
155 terminated by an exception, this returns the list of traceback
156 frames.
158 The frames are always ordered from oldest to newest.
160 The optional limit gives the maximum number of frames to
161 return; by default all available frames are returned. Its
162 meaning differs depending on whether a stack or a traceback is
163 returned: the newest frames of a stack are returned, but the
164 oldest frames of a traceback are returned. (This matches the
165 behavior of the traceback module.)
167 For reasons beyond our control, only one stack frame is
168 returned for a suspended coroutine.
169 """
170 return base_tasks._task_get_stack(self, limit)
172 def print_stack(self, *, limit=None, file=None):
173 """Print the stack or traceback for this task's coroutine.
175 This produces output similar to that of the traceback module,
176 for the frames retrieved by get_stack(). The limit argument
177 is passed to get_stack(). The file argument is an I/O stream
178 to which the output is written; by default output is written
179 to sys.stderr.
180 """
181 return base_tasks._task_print_stack(self, limit, file)
183 def cancel(self, msg=None):
184 """Request that this task cancel itself.
186 This arranges for a CancelledError to be thrown into the
187 wrapped coroutine on the next cycle through the event loop.
188 The coroutine then has a chance to clean up or even deny
189 the request using try/except/finally.
191 Unlike Future.cancel, this does not guarantee that the
192 task will be cancelled: the exception might be caught and
193 acted upon, delaying cancellation of the task or preventing
194 cancellation completely. The task may also return a value or
195 raise a different exception.
197 Immediately after this method is called, Task.cancelled() will
198 not return True (unless the task was already cancelled). A
199 task will be marked as cancelled when the wrapped coroutine
200 terminates with a CancelledError exception (even if cancel()
201 was not called).
203 This also increases the task's count of cancellation requests.
204 """
205 self._log_traceback = False
206 if self.done():
207 return False
208 self._num_cancels_requested += 1
209 # These two lines are controversial. See discussion starting at
210 # https://github.com/python/cpython/pull/31394#issuecomment-1053545331
211 # Also remember that this is duplicated in _asynciomodule.c.
212 # if self._num_cancels_requested > 1:
213 # return False
214 if self._fut_waiter is not None:
215 if self._fut_waiter.cancel(msg=msg):
216 # Leave self._fut_waiter; it may be a Task that
217 # catches and ignores the cancellation so we may have
218 # to cancel it again later.
219 return True
220 # It must be the case that self.__step is already scheduled.
221 self._must_cancel = True
222 self._cancel_message = msg
223 return True
225 def cancelling(self):
226 """Return the count of the task's cancellation requests.
228 This count is incremented when .cancel() is called
229 and may be decremented using .uncancel().
230 """
231 return self._num_cancels_requested
233 def uncancel(self):
234 """Decrement the task's count of cancellation requests.
236 This should be called by the party that called `cancel()` on the task
237 beforehand.
239 Returns the remaining number of cancellation requests.
240 """
241 if self._num_cancels_requested > 0: 241 ↛ 245line 241 didn't jump to line 245 because the condition on line 241 was always true
242 self._num_cancels_requested -= 1
243 if self._num_cancels_requested == 0: 243 ↛ 245line 243 didn't jump to line 245 because the condition on line 243 was always true
244 self._must_cancel = False
245 return self._num_cancels_requested
247 def __eager_start(self):
248 prev_task = _py_swap_current_task(self._loop, self)
249 try:
250 _py_register_eager_task(self)
251 try:
252 self._context.run(self.__step_run_and_handle_result, None)
253 finally:
254 _py_unregister_eager_task(self)
255 finally:
256 try:
257 curtask = _py_swap_current_task(self._loop, prev_task)
258 assert curtask is self
259 finally:
260 if self.done():
261 self._coro = None
262 self = None # Needed to break cycles when an exception occurs.
263 else:
264 _py_register_task(self)
266 def __step(self, exc=None):
267 if self.done():
268 raise exceptions.InvalidStateError(
269 f'__step(): already done: {self!r}, {exc!r}')
270 if self._must_cancel:
271 if not isinstance(exc, exceptions.CancelledError):
272 exc = self._make_cancelled_error()
273 self._must_cancel = False
274 self._fut_waiter = None
276 _py_enter_task(self._loop, self)
277 try:
278 self.__step_run_and_handle_result(exc)
279 finally:
280 _py_leave_task(self._loop, self)
281 self = None # Needed to break cycles when an exception occurs.
283 def __step_run_and_handle_result(self, exc):
284 coro = self._coro
285 try:
286 if exc is None:
287 # We use the `send` method directly, because coroutines
288 # don't have `__iter__` and `__next__` methods.
289 result = coro.send(None)
290 else:
291 result = coro.throw(exc)
292 except StopIteration as exc:
293 if self._must_cancel:
294 # Task is cancelled right before coro stops.
295 self._must_cancel = False
296 super().cancel(msg=self._cancel_message)
297 else:
298 super().set_result(exc.value)
299 except exceptions.CancelledError as exc:
300 # Save the original exception so we can chain it later.
301 self._cancelled_exc = exc
302 super().cancel() # I.e., Future.cancel(self).
303 except (KeyboardInterrupt, SystemExit) as exc:
304 super().set_exception(exc)
305 raise
306 except BaseException as exc:
307 super().set_exception(exc)
308 else:
309 blocking = getattr(result, '_asyncio_future_blocking', None)
310 if blocking is not None:
311 # Yielded Future must come from Future.__iter__().
312 if futures._get_loop(result) is not self._loop:
313 new_exc = RuntimeError(
314 f'Task {self!r} got Future '
315 f'{result!r} attached to a different loop')
316 self._loop.call_soon(
317 self.__step, new_exc, context=self._context)
318 elif blocking: 318 ↛ 335line 318 didn't jump to line 335 because the condition on line 318 was always true
319 if result is self:
320 new_exc = RuntimeError(
321 f'Task cannot await on itself: {self!r}')
322 self._loop.call_soon(
323 self.__step, new_exc, context=self._context)
324 else:
325 futures.future_add_to_awaited_by(result, self)
326 result._asyncio_future_blocking = False
327 result.add_done_callback(
328 self.__wakeup, context=self._context)
329 self._fut_waiter = result
330 if self._must_cancel:
331 if self._fut_waiter.cancel( 331 ↛ 357line 331 didn't jump to line 357 because the condition on line 331 was always true
332 msg=self._cancel_message):
333 self._must_cancel = False
334 else:
335 new_exc = RuntimeError(
336 f'yield was used instead of yield from '
337 f'in task {self!r} with {result!r}')
338 self._loop.call_soon(
339 self.__step, new_exc, context=self._context)
341 elif result is None: 341 ↛ 344line 341 didn't jump to line 344 because the condition on line 341 was always true
342 # Bare yield relinquishes control for one event loop iteration.
343 self._loop.call_soon(self.__step, context=self._context)
344 elif inspect.isgenerator(result):
345 # Yielding a generator is just wrong.
346 new_exc = RuntimeError(
347 f'yield was used instead of yield from for '
348 f'generator in task {self!r} with {result!r}')
349 self._loop.call_soon(
350 self.__step, new_exc, context=self._context)
351 else:
352 # Yielding something else is an error.
353 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
354 self._loop.call_soon(
355 self.__step, new_exc, context=self._context)
356 finally:
357 self = None # Needed to break cycles when an exception occurs.
359 def __wakeup(self, future):
360 futures.future_discard_from_awaited_by(future, self)
361 try:
362 future.result()
363 except BaseException as exc:
364 # This may also be a cancellation.
365 self.__step(exc)
366 else:
367 # Don't pass the value of `future.result()` explicitly,
368 # as `Future.__iter__` and `Future.__await__` don't need it.
369 # If we call `__step(value, None)` instead of `__step()`,
370 # Python eval loop would use `.send(value)` method call,
371 # instead of `__next__()`, which is slower for futures
372 # that return non-generator iterators from their `__iter__`.
373 self.__step()
374 self = None # Needed to break cycles when an exception occurs.
377_PyTask = Task
380try:
381 import _asyncio
382except ImportError:
383 pass
384else:
385 # _CTask is needed for tests.
386 Task = _CTask = _asyncio.Task
389def create_task(coro, *, name=None, context=None):
390 """Schedule the execution of a coroutine object in a spawn task.
392 Return a Task object.
393 """
394 loop = events.get_running_loop()
395 if context is None:
396 # Use legacy API if context is not needed
397 task = loop.create_task(coro, name=name)
398 else:
399 task = loop.create_task(coro, name=name, context=context)
401 return task
404# wait() and as_completed() similar to those in PEP 3148.
406FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
407FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
408ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
411async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
412 """Wait for the Futures or Tasks given by fs to complete.
414 The fs iterable must not be empty.
416 Returns two sets of Future: (done, pending).
418 Usage:
420 done, pending = await asyncio.wait(fs)
422 Note: This does not raise TimeoutError! Futures that aren't done
423 when the timeout occurs are returned in the second set.
424 """
425 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
426 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
427 if not fs:
428 raise ValueError('Set of Tasks/Futures is empty.')
429 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
430 raise ValueError(f'Invalid return_when value: {return_when}')
432 fs = set(fs)
434 if any(coroutines.iscoroutine(f) for f in fs): 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true
435 raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
437 loop = events.get_running_loop()
438 return await _wait(fs, timeout, return_when, loop)
441def _release_waiter(waiter, *args):
442 if not waiter.done(): 442 ↛ exitline 442 didn't return from function '_release_waiter' because the condition on line 442 was always true
443 waiter.set_result(None)
446async def wait_for(fut, timeout):
447 """Wait for the single Future or coroutine to complete, with timeout.
449 Coroutine will be wrapped in Task.
451 Returns result of the Future or coroutine. When a timeout occurs,
452 it cancels the task and raises TimeoutError. To avoid the task
453 cancellation, wrap it in shield().
455 If the wait is cancelled, the task is also cancelled.
457 If the task suppresses the cancellation and returns a value instead,
458 that value is returned.
460 This function is a coroutine.
461 """
462 # The special case for timeout <= 0 is for the following case:
463 #
464 # async def test_waitfor():
465 # func_started = False
466 #
467 # async def func():
468 # nonlocal func_started
469 # func_started = True
470 #
471 # try:
472 # await asyncio.wait_for(func(), 0)
473 # except asyncio.TimeoutError:
474 # assert not func_started
475 # else:
476 # assert False
477 #
478 # asyncio.run(test_waitfor())
481 if timeout is not None and timeout <= 0:
482 fut = ensure_future(fut)
484 if fut.done():
485 return fut.result()
487 await _cancel_and_wait(fut)
488 try:
489 return fut.result()
490 except exceptions.CancelledError as exc:
491 raise TimeoutError from exc
493 async with timeouts.timeout(timeout):
494 return await fut
496async def _wait(fs, timeout, return_when, loop):
497 """Internal helper for wait().
499 The fs argument must be a collection of Futures.
500 """
501 assert fs, 'Set of Futures is empty.'
502 waiter = loop.create_future()
503 timeout_handle = None
504 if timeout is not None:
505 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
506 counter = len(fs)
507 cur_task = current_task()
509 def _on_completion(f):
510 nonlocal counter
511 counter -= 1
512 if (counter <= 0 or
513 return_when == FIRST_COMPLETED or
514 return_when == FIRST_EXCEPTION and (not f.cancelled() and
515 f.exception() is not None)):
516 if timeout_handle is not None: 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true
517 timeout_handle.cancel()
518 if not waiter.done():
519 waiter.set_result(None)
520 futures.future_discard_from_awaited_by(f, cur_task)
522 for f in fs:
523 f.add_done_callback(_on_completion)
524 futures.future_add_to_awaited_by(f, cur_task)
526 try:
527 await waiter
528 finally:
529 if timeout_handle is not None:
530 timeout_handle.cancel()
531 for f in fs:
532 f.remove_done_callback(_on_completion)
534 done, pending = set(), set()
535 for f in fs:
536 if f.done():
537 done.add(f)
538 else:
539 pending.add(f)
540 return done, pending
543async def _cancel_and_wait(fut):
544 """Cancel the *fut* future or task and wait until it completes."""
546 loop = events.get_running_loop()
547 waiter = loop.create_future()
548 cb = functools.partial(_release_waiter, waiter)
549 fut.add_done_callback(cb)
551 try:
552 fut.cancel()
553 # We cannot wait on *fut* directly to make
554 # sure _cancel_and_wait itself is reliably cancellable.
555 await waiter
556 finally:
557 fut.remove_done_callback(cb)
560class _AsCompletedIterator:
561 """Iterator of awaitables representing tasks of asyncio.as_completed.
563 As an asynchronous iterator, iteration yields futures as they finish. As a
564 plain iterator, new coroutines are yielded that will return or raise the
565 result of the next underlying future to complete.
566 """
567 def __init__(self, aws, timeout):
568 self._done = queues.Queue()
569 self._timeout_handle = None
571 loop = events.get_event_loop()
572 todo = {ensure_future(aw, loop=loop) for aw in set(aws)}
573 for f in todo:
574 f.add_done_callback(self._handle_completion)
575 if todo and timeout is not None:
576 self._timeout_handle = (
577 loop.call_later(timeout, self._handle_timeout)
578 )
579 self._todo = todo
580 self._todo_left = len(todo)
582 def __aiter__(self):
583 return self
585 def __iter__(self):
586 return self
588 async def __anext__(self):
589 if not self._todo_left:
590 raise StopAsyncIteration
591 assert self._todo_left > 0
592 self._todo_left -= 1
593 return await self._wait_for_one()
595 def __next__(self):
596 if not self._todo_left:
597 raise StopIteration
598 assert self._todo_left > 0
599 self._todo_left -= 1
600 return self._wait_for_one(resolve=True)
602 def _handle_timeout(self):
603 for f in self._todo:
604 f.remove_done_callback(self._handle_completion)
605 self._done.put_nowait(None) # Sentinel for _wait_for_one().
606 self._todo.clear() # Can't do todo.remove(f) in the loop.
608 def _handle_completion(self, f):
609 if not self._todo: 609 ↛ 610line 609 didn't jump to line 610 because the condition on line 609 was never true
610 return # _handle_timeout() was here first.
611 self._todo.remove(f)
612 self._done.put_nowait(f)
613 if not self._todo and self._timeout_handle is not None:
614 self._timeout_handle.cancel()
616 async def _wait_for_one(self, resolve=False):
617 # Wait for the next future to be done and return it unless resolve is
618 # set, in which case return either the result of the future or raise
619 # an exception.
620 f = await self._done.get()
621 if f is None:
622 # Dummy value from _handle_timeout().
623 raise exceptions.TimeoutError
624 return f.result() if resolve else f
627def as_completed(fs, *, timeout=None):
628 """Create an iterator of awaitables or their results in completion order.
630 Run the supplied awaitables concurrently. The returned object can be
631 iterated to obtain the results of the awaitables as they finish.
633 The object returned can be iterated as an asynchronous iterator or a plain
634 iterator. When asynchronous iteration is used, the originally-supplied
635 awaitables are yielded if they are tasks or futures. This makes it easy to
636 correlate previously-scheduled tasks with their results:
638 ipv4_connect = create_task(open_connection("127.0.0.1", 80))
639 ipv6_connect = create_task(open_connection("::1", 80))
640 tasks = [ipv4_connect, ipv6_connect]
642 async for earliest_connect in as_completed(tasks):
643 # earliest_connect is done. The result can be obtained by
644 # awaiting it or calling earliest_connect.result()
645 reader, writer = await earliest_connect
647 if earliest_connect is ipv6_connect:
648 print("IPv6 connection established.")
649 else:
650 print("IPv4 connection established.")
652 During asynchronous iteration, implicitly-created tasks will be yielded for
653 supplied awaitables that aren't tasks or futures.
655 When used as a plain iterator, each iteration yields a new coroutine that
656 returns the result or raises the exception of the next completed awaitable.
657 This pattern is compatible with Python versions older than 3.13:
659 ipv4_connect = create_task(open_connection("127.0.0.1", 80))
660 ipv6_connect = create_task(open_connection("::1", 80))
661 tasks = [ipv4_connect, ipv6_connect]
663 for next_connect in as_completed(tasks):
664 # next_connect is not one of the original task objects. It must be
665 # awaited to obtain the result value or raise the exception of the
666 # awaitable that finishes next.
667 reader, writer = await next_connect
669 A TimeoutError is raised if the timeout occurs before all awaitables are
670 done. This is raised by the async for loop during asynchronous iteration or
671 by the coroutines yielded during plain iteration.
672 """
673 if inspect.isawaitable(fs):
674 raise TypeError(
675 f"expects an iterable of awaitables, not {type(fs).__name__}"
676 )
678 return _AsCompletedIterator(fs, timeout)
681@types.coroutine
682def __sleep0():
683 """Skip one event loop run cycle.
685 This is a private helper for 'asyncio.sleep()', used
686 when the 'delay' is set to 0. It uses a bare 'yield'
687 expression (which Task.__step knows how to handle)
688 instead of creating a Future object.
689 """
690 yield
693async def sleep(delay, result=None):
694 """Coroutine that completes after a given time (in seconds)."""
695 if delay <= 0:
696 await __sleep0()
697 return result
699 if math.isnan(delay):
700 raise ValueError("Invalid delay: NaN (not a number)")
702 loop = events.get_running_loop()
703 future = loop.create_future()
704 h = loop.call_later(delay,
705 futures._set_result_unless_cancelled,
706 future, result)
707 try:
708 return await future
709 finally:
710 h.cancel()
713def ensure_future(coro_or_future, *, loop=None):
714 """Wrap a coroutine or an awaitable in a future.
716 If the argument is a Future, it is returned directly.
717 """
718 if futures.isfuture(coro_or_future):
719 if loop is not None and loop is not futures._get_loop(coro_or_future):
720 raise ValueError('The future belongs to a different loop than '
721 'the one specified as the loop argument')
722 return coro_or_future
723 should_close = True
724 if not coroutines.iscoroutine(coro_or_future):
725 if inspect.isawaitable(coro_or_future):
726 async def _wrap_awaitable(awaitable):
727 return await awaitable
729 coro_or_future = _wrap_awaitable(coro_or_future)
730 should_close = False
731 else:
732 raise TypeError('An asyncio.Future, a coroutine or an awaitable '
733 'is required')
735 if loop is None:
736 loop = events.get_event_loop()
737 try:
738 return loop.create_task(coro_or_future)
739 except RuntimeError:
740 if should_close: 740 ↛ 742line 740 didn't jump to line 742 because the condition on line 740 was always true
741 coro_or_future.close()
742 raise
745class _GatheringFuture(futures.Future):
746 """Helper for gather().
748 This overrides cancel() to cancel all the children and act more
749 like Task.cancel(), which doesn't immediately mark itself as
750 cancelled.
751 """
753 def __init__(self, children, *, loop):
754 assert loop is not None
755 super().__init__(loop=loop)
756 self._children = children
757 self._cancel_requested = False
759 def cancel(self, msg=None):
760 if self.done():
761 return False
762 ret = False
763 for child in self._children:
764 if child.cancel(msg=msg):
765 ret = True
766 if ret:
767 # If any child tasks were actually cancelled, we should
768 # propagate the cancellation request regardless of
769 # *return_exceptions* argument. See issue 32684.
770 self._cancel_requested = True
771 return ret
774def gather(*coros_or_futures, return_exceptions=False):
775 """Return a future aggregating results from the given coroutines/futures.
777 Coroutines will be wrapped in a future and scheduled in the event
778 loop. They will not necessarily be scheduled in the same order as
779 passed in.
781 All futures must share the same event loop. If all the tasks are
782 done successfully, the returned future's result is the list of
783 results (in the order of the original sequence, not necessarily
784 the order of results arrival). If *return_exceptions* is True,
785 exceptions in the tasks are treated the same as successful
786 results, and gathered in the result list; otherwise, the first
787 raised exception will be immediately propagated to the returned
788 future.
790 Cancellation: if the outer Future is cancelled, all children (that
791 have not completed yet) are also cancelled. If any child is
792 cancelled, this is treated as if it raised CancelledError --
793 the outer Future is *not* cancelled in this case. (This is to
794 prevent the cancellation of one child to cause other children to
795 be cancelled.)
797 If *return_exceptions* is False, cancelling gather() after it
798 has been marked done won't cancel any submitted awaitables.
799 For instance, gather can be marked done after propagating an
800 exception to the caller, therefore, calling ``gather.cancel()``
801 after catching an exception (raised by one of the awaitables) from
802 gather won't cancel any other awaitables.
803 """
804 if not coros_or_futures:
805 loop = events.get_event_loop()
806 outer = loop.create_future()
807 outer.set_result([])
808 return outer
810 loop = events._get_running_loop()
811 if loop is not None:
812 cur_task = current_task(loop)
813 else:
814 cur_task = None
816 def _done_callback(fut, cur_task=cur_task):
817 nonlocal nfinished
818 nfinished += 1
820 if cur_task is not None:
821 futures.future_discard_from_awaited_by(fut, cur_task)
823 if outer is None or outer.done():
824 if not fut.cancelled():
825 # Mark exception retrieved.
826 fut.exception()
827 return
829 if not return_exceptions:
830 if fut.cancelled():
831 # Check if 'fut' is cancelled first, as
832 # 'fut.exception()' will *raise* a CancelledError
833 # instead of returning it.
834 exc = fut._make_cancelled_error()
835 outer.set_exception(exc)
836 return
837 else:
838 exc = fut.exception()
839 if exc is not None:
840 outer.set_exception(exc)
841 return
843 if nfinished == nfuts:
844 # All futures are done; create a list of results
845 # and set it to the 'outer' future.
846 results = []
848 for fut in children:
849 if fut.cancelled():
850 # Check if 'fut' is cancelled first, as 'fut.exception()'
851 # will *raise* a CancelledError instead of returning it.
852 # Also, since we're adding the exception return value
853 # to 'results' instead of raising it, don't bother
854 # setting __context__. This also lets us preserve
855 # calling '_make_cancelled_error()' at most once.
856 res = exceptions.CancelledError(
857 '' if fut._cancel_message is None else
858 fut._cancel_message)
859 else:
860 res = fut.exception()
861 if res is None:
862 res = fut.result()
863 results.append(res)
865 if outer._cancel_requested:
866 # If gather is being cancelled we must propagate the
867 # cancellation regardless of *return_exceptions* argument.
868 # See issue 32684.
869 exc = fut._make_cancelled_error()
870 outer.set_exception(exc)
871 else:
872 outer.set_result(results)
874 arg_to_fut = {}
875 children = []
876 nfuts = 0
877 nfinished = 0
878 done_futs = []
879 outer = None # bpo-46672
880 for arg in coros_or_futures:
881 if arg not in arg_to_fut:
882 fut = ensure_future(arg, loop=loop)
883 if loop is None:
884 loop = futures._get_loop(fut)
885 if fut is not arg:
886 # 'arg' was not a Future, therefore, 'fut' is a new
887 # Future created specifically for 'arg'. Since the caller
888 # can't control it, disable the "destroy pending task"
889 # warning.
890 fut._log_destroy_pending = False
891 nfuts += 1
892 arg_to_fut[arg] = fut
893 if fut.done():
894 done_futs.append(fut)
895 else:
896 if cur_task is not None:
897 futures.future_add_to_awaited_by(fut, cur_task)
898 fut.add_done_callback(_done_callback)
900 else:
901 # There's a duplicate Future object in coros_or_futures.
902 fut = arg_to_fut[arg]
904 children.append(fut)
906 outer = _GatheringFuture(children, loop=loop)
907 # Run done callbacks after GatheringFuture created so any post-processing
908 # can be performed at this point
909 # optimization: in the special case that *all* futures finished eagerly,
910 # this will effectively complete the gather eagerly, with the last
911 # callback setting the result (or exception) on outer before returning it
912 for fut in done_futs:
913 _done_callback(fut)
914 return outer
917def shield(arg):
918 """Wait for a future, shielding it from cancellation.
920 The statement
922 task = asyncio.create_task(something())
923 res = await shield(task)
925 is exactly equivalent to the statement
927 res = await something()
929 *except* that if the coroutine containing it is cancelled, the
930 task running in something() is not cancelled. From the POV of
931 something(), the cancellation did not happen. But its caller is
932 still cancelled, so the yield-from expression still raises
933 CancelledError. Note: If something() is cancelled by other means
934 this will still cancel shield().
936 If you want to completely ignore cancellation (not recommended)
937 you can combine shield() with a try/except clause, as follows:
939 task = asyncio.create_task(something())
940 try:
941 res = await shield(task)
942 except CancelledError:
943 res = None
945 Save a reference to tasks passed to this function, to avoid
946 a task disappearing mid-execution. The event loop only keeps
947 weak references to tasks. A task that isn't referenced elsewhere
948 may get garbage collected at any time, even before it's done.
949 """
950 inner = ensure_future(arg)
951 if inner.done():
952 # Shortcut.
953 return inner
954 loop = futures._get_loop(inner)
955 outer = loop.create_future()
957 if loop is not None and (cur_task := current_task(loop)) is not None:
958 futures.future_add_to_awaited_by(inner, cur_task)
959 else:
960 cur_task = None
962 def _inner_done_callback(inner, cur_task=cur_task):
963 if cur_task is not None:
964 futures.future_discard_from_awaited_by(inner, cur_task)
966 if outer.cancelled(): 966 ↛ 967line 966 didn't jump to line 967 because the condition on line 966 was never true
967 if not inner.cancelled():
968 # Mark inner's result as retrieved.
969 inner.exception()
970 return
972 if inner.cancelled():
973 outer.cancel()
974 else:
975 exc = inner.exception()
976 if exc is not None:
977 outer.set_exception(exc)
978 else:
979 outer.set_result(inner.result())
982 def _outer_done_callback(outer):
983 if not inner.done():
984 inner.remove_done_callback(_inner_done_callback)
986 inner.add_done_callback(_inner_done_callback)
987 outer.add_done_callback(_outer_done_callback)
988 return outer
991def run_coroutine_threadsafe(coro, loop):
992 """Submit a coroutine object to a given event loop.
994 Return a concurrent.futures.Future to access the result.
995 """
996 if not coroutines.iscoroutine(coro): 996 ↛ 997line 996 didn't jump to line 997 because the condition on line 996 was never true
997 raise TypeError('A coroutine object is required')
998 future = concurrent.futures.Future()
1000 def callback():
1001 try:
1002 futures._chain_future(ensure_future(coro, loop=loop), future)
1003 except (SystemExit, KeyboardInterrupt):
1004 raise
1005 except BaseException as exc:
1006 if future.set_running_or_notify_cancel(): 1006 ↛ 1008line 1006 didn't jump to line 1008 because the condition on line 1006 was always true
1007 future.set_exception(exc)
1008 raise
1010 loop.call_soon_threadsafe(callback)
1011 return future
1014def create_eager_task_factory(custom_task_constructor):
1015 """Create a function suitable for use as a task factory on an event-loop.
1017 Example usage:
1019 loop.set_task_factory(
1020 asyncio.create_eager_task_factory(my_task_constructor))
1022 Now, tasks created will be started immediately (rather than being first
1023 scheduled to an event loop). The constructor argument can be any callable
1024 that returns a Task-compatible object and has a signature compatible
1025 with `Task.__init__`; it must have the `eager_start` keyword argument.
1027 Most applications will use `Task` for `custom_task_constructor` and in
1028 this case there's no need to call `create_eager_task_factory()`
1029 directly. Instead the global `eager_task_factory` instance can be
1030 used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`.
1031 """
1033 def factory(loop, coro, *, name=None, context=None):
1034 return custom_task_constructor(
1035 coro, loop=loop, name=name, context=context, eager_start=True)
1037 return factory
1040eager_task_factory = create_eager_task_factory(Task)
1043# Collectively these two sets hold references to the complete set of active
1044# tasks. Eagerly executed tasks use a faster regular set as an optimization
1045# but may graduate to a WeakSet if the task blocks on IO.
1046_scheduled_tasks = weakref.WeakSet()
1047_eager_tasks = set()
1049# Dictionary containing tasks that are currently active in
1050# all running event loops. {EventLoop: Task}
1051_current_tasks = {}
1054def _register_task(task):
1055 """Register an asyncio Task scheduled to run on an event loop."""
1056 _scheduled_tasks.add(task)
1059def _register_eager_task(task):
1060 """Register an asyncio Task about to be eagerly executed."""
1061 _eager_tasks.add(task)
1064def _enter_task(loop, task):
1065 current_task = _current_tasks.get(loop)
1066 if current_task is not None:
1067 raise RuntimeError(f"Cannot enter into task {task!r} while another "
1068 f"task {current_task!r} is being executed.")
1069 _current_tasks[loop] = task
1072def _leave_task(loop, task):
1073 current_task = _current_tasks.get(loop)
1074 if current_task is not task:
1075 raise RuntimeError(f"Leaving task {task!r} does not match "
1076 f"the current task {current_task!r}.")
1077 del _current_tasks[loop]
1080def _swap_current_task(loop, task):
1081 prev_task = _current_tasks.get(loop)
1082 if task is None:
1083 del _current_tasks[loop]
1084 else:
1085 _current_tasks[loop] = task
1086 return prev_task
1089def _unregister_task(task):
1090 """Unregister a completed, scheduled Task."""
1091 _scheduled_tasks.discard(task)
1094def _unregister_eager_task(task):
1095 """Unregister a task which finished its first eager step."""
1096 _eager_tasks.discard(task)
1099_py_current_task = current_task
1100_py_register_task = _register_task
1101_py_register_eager_task = _register_eager_task
1102_py_unregister_task = _unregister_task
1103_py_unregister_eager_task = _unregister_eager_task
1104_py_enter_task = _enter_task
1105_py_leave_task = _leave_task
1106_py_swap_current_task = _swap_current_task
1107_py_all_tasks = all_tasks
1109try:
1110 from _asyncio import (_register_task, _register_eager_task,
1111 _unregister_task, _unregister_eager_task,
1112 _enter_task, _leave_task, _swap_current_task,
1113 current_task, all_tasks)
1114except ImportError:
1115 pass
1116else:
1117 _c_current_task = current_task
1118 _c_register_task = _register_task
1119 _c_register_eager_task = _register_eager_task
1120 _c_unregister_task = _unregister_task
1121 _c_unregister_eager_task = _unregister_eager_task
1122 _c_enter_task = _enter_task
1123 _c_leave_task = _leave_task
1124 _c_swap_current_task = _swap_current_task
1125 _c_all_tasks = all_tasks