Coverage for Lib/asyncio/staggered.py: 95%
84 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-23 01:21 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-23 01:21 +0000
1"""Support for running coroutines in parallel with staggered start times."""
3__all__ = 'staggered_race',
5import contextlib
7from . import events
8from . import exceptions as exceptions_mod
9from . import locks
10from . import tasks
11from . import futures
14async def staggered_race(coro_fns, delay, *, loop=None):
15 """Run coroutines with staggered start times and take the first to finish.
17 This method takes an iterable of coroutine functions. The first one is
18 started immediately. From then on, whenever the immediately preceding one
19 fails (raises an exception), or when *delay* seconds has passed, the next
20 coroutine is started. This continues until one of the coroutines complete
21 successfully, in which case all others are cancelled, or until all
22 coroutines fail.
24 The coroutines provided should be well-behaved in the following way:
26 * They should only ``return`` if completed successfully.
28 * They should always raise an exception if they did not complete
29 successfully. In particular, if they handle cancellation, they should
30 probably reraise, like this::
32 try:
33 # do work
34 except asyncio.CancelledError:
35 # undo partially completed work
36 raise
38 Args:
39 coro_fns: an iterable of coroutine functions, i.e. callables that
40 return a coroutine object when called. Use ``functools.partial`` or
41 lambdas to pass arguments.
43 delay: amount of time, in seconds, between starting coroutines. If
44 ``None``, the coroutines will run sequentially.
46 loop: the event loop to use.
48 Returns:
49 tuple *(winner_result, winner_index, exceptions)* where
51 - *winner_result*: the result of the winning coroutine, or ``None``
52 if no coroutines won.
54 - *winner_index*: the index of the winning coroutine in
55 ``coro_fns``, or ``None`` if no coroutines won. If the winning
56 coroutine may return None on success, *winner_index* can be used
57 to definitively determine whether any coroutine won.
59 - *exceptions*: list of exceptions returned by the coroutines.
60 ``len(exceptions)`` is equal to the number of coroutines actually
61 started, and the order is the same as in ``coro_fns``. The winning
62 coroutine's entry is ``None``.
64 """
65 loop = loop or events.get_running_loop()
66 parent_task = tasks.current_task(loop)
67 enum_coro_fns = enumerate(coro_fns)
68 winner_result = None
69 winner_index = None
70 unhandled_exceptions = []
71 exceptions = []
72 running_tasks = set()
73 on_completed_fut = None
75 def task_done(task):
76 running_tasks.discard(task)
77 futures.future_discard_from_awaited_by(task, parent_task)
78 if (
79 on_completed_fut is not None
80 and not on_completed_fut.done()
81 and not running_tasks
82 ):
83 on_completed_fut.set_result(None)
85 if task.cancelled():
86 return
88 exc = task.exception()
89 if exc is None: 89 ↛ 91line 89 didn't jump to line 91 because the condition on line 89 was always true
90 return
91 unhandled_exceptions.append(exc)
93 async def run_one_coro(ok_to_start, previous_failed) -> None:
94 # in eager tasks this waits for the calling task to append this task
95 # to running_tasks, in regular tasks this wait is a no-op that does
96 # not yield a future. See gh-124309.
97 await ok_to_start.wait()
98 # Wait for the previous task to finish, or for delay seconds
99 if previous_failed is not None:
100 with contextlib.suppress(exceptions_mod.TimeoutError):
101 # Use asyncio.wait_for() instead of asyncio.wait() here, so
102 # that if we get cancelled at this point, Event.wait() is also
103 # cancelled, otherwise there will be a "Task destroyed but it is
104 # pending" later.
105 await tasks.wait_for(previous_failed.wait(), delay)
106 # Get the next coroutine to run
107 try:
108 this_index, coro_fn = next(enum_coro_fns)
109 except StopIteration:
110 return
111 # Start task that will run the next coroutine
112 this_failed = locks.Event()
113 next_ok_to_start = locks.Event()
114 next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed))
115 futures.future_add_to_awaited_by(next_task, parent_task)
116 running_tasks.add(next_task)
117 next_task.add_done_callback(task_done)
118 # next_task has been appended to running_tasks so next_task is ok to
119 # start.
120 next_ok_to_start.set()
121 # Prepare place to put this coroutine's exceptions if not won
122 exceptions.append(None)
123 assert len(exceptions) == this_index + 1
125 try:
126 result = await coro_fn()
127 except (SystemExit, KeyboardInterrupt):
128 raise
129 except BaseException as e:
130 exceptions[this_index] = e
131 this_failed.set() # Kickstart the next coroutine
132 else:
133 # Store winner's results
134 nonlocal winner_index, winner_result
135 assert winner_index is None
136 winner_index = this_index
137 winner_result = result
138 # Cancel all other tasks. We take care to not cancel the current
139 # task as well. If we do so, then since there is no `await` after
140 # here and CancelledError are usually thrown at one, we will
141 # encounter a curious corner case where the current task will end
142 # up as done() == True, cancelled() == False, exception() ==
143 # asyncio.CancelledError. This behavior is specified in
144 # https://bugs.python.org/issue30048
145 current_task = tasks.current_task(loop)
146 for t in running_tasks:
147 if t is not current_task:
148 t.cancel()
150 propagate_cancellation_error = None
151 try:
152 ok_to_start = locks.Event()
153 first_task = loop.create_task(run_one_coro(ok_to_start, None))
154 futures.future_add_to_awaited_by(first_task, parent_task)
155 running_tasks.add(first_task)
156 first_task.add_done_callback(task_done)
157 # first_task has been appended to running_tasks so first_task is ok to start.
158 ok_to_start.set()
159 propagate_cancellation_error = None
160 # Make sure no tasks are left running if we leave this function
161 while running_tasks:
162 on_completed_fut = loop.create_future()
163 try:
164 await on_completed_fut
165 except exceptions_mod.CancelledError as ex:
166 propagate_cancellation_error = ex
167 for task in running_tasks:
168 task.cancel(*ex.args)
169 on_completed_fut = None
170 if __debug__ and unhandled_exceptions: 170 ↛ 173line 170 didn't jump to line 173 because the condition on line 170 was never true
171 # If run_one_coro raises an unhandled exception, it's probably a
172 # programming error, and I want to see it.
173 raise ExceptionGroup("staggered race failed", unhandled_exceptions)
174 if propagate_cancellation_error is not None:
175 raise propagate_cancellation_error
176 return winner_result, winner_index, exceptions
177 finally:
178 del exceptions, propagate_cancellation_error, unhandled_exceptions, parent_task