Coverage for Lib/asyncio/staggered.py: 95%
84 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
1"""Support for running coroutines in parallel with staggered start times."""
3__all__ = 'staggered_race',
5import contextlib
7from . import events
8from . import exceptions as exceptions_mod
9from . import locks
10from . import tasks
11from . import futures
14async def staggered_race(coro_fns, delay, *, loop=None):
15 """Run coroutines with staggered start times and take the first to finish.
17 This method takes an iterable of coroutine functions. The first one is
18 started immediately. From then on, whenever the immediately preceding one
19 fails (raises an exception), or when *delay* seconds has passed, the next
20 coroutine is started. This continues until one of the coroutines complete
21 successfully, in which case all others are cancelled, or until all
22 coroutines fail.
24 The coroutines provided should be well-behaved in the following way:
26 * They should only ``return`` if completed successfully.
28 * They should always raise an exception if they did not complete
29 successfully. In particular, if they handle cancellation, they should
30 probably reraise, like this::
32 try:
33 # do work
34 except asyncio.CancelledError:
35 # undo partially completed work
36 raise
38 Args:
39 coro_fns: an iterable of coroutine functions, i.e. callables that
40 return a coroutine object when called. Use ``functools.partial`` or
41 lambdas to pass arguments.
43 delay: amount of time, in seconds, between starting coroutines. If
44 ``None``, the coroutines will run sequentially.
46 loop: the event loop to use.
48 Returns:
49 tuple *(winner_result, winner_index, exceptions)* where
51 - *winner_result*: the result of the winning coroutine, or ``None``
52 if no coroutines won.
54 - *winner_index*: the index of the winning coroutine in
55 ``coro_fns``, or ``None`` if no coroutines won. If the winning
56 coroutine may return None on success, *winner_index* can be used
57 to definitively determine whether any coroutine won.
59 - *exceptions*: list of exceptions returned by the coroutines.
60 ``len(exceptions)`` is equal to the number of coroutines actually
61 started, and the order is the same as in ``coro_fns``. The winning
62 coroutine's entry is ``None``.
64 """
65 # TODO: when we have aiter() and anext(), allow async iterables in coro_fns.
66 loop = loop or events.get_running_loop()
67 parent_task = tasks.current_task(loop)
68 enum_coro_fns = enumerate(coro_fns)
69 winner_result = None
70 winner_index = None
71 unhandled_exceptions = []
72 exceptions = []
73 running_tasks = set()
74 on_completed_fut = None
76 def task_done(task):
77 running_tasks.discard(task)
78 futures.future_discard_from_awaited_by(task, parent_task)
79 if (
80 on_completed_fut is not None
81 and not on_completed_fut.done()
82 and not running_tasks
83 ):
84 on_completed_fut.set_result(None)
86 if task.cancelled():
87 return
89 exc = task.exception()
90 if exc is None: 90 ↛ 92line 90 didn't jump to line 92 because the condition on line 90 was always true
91 return
92 unhandled_exceptions.append(exc)
94 async def run_one_coro(ok_to_start, previous_failed) -> None:
95 # in eager tasks this waits for the calling task to append this task
96 # to running_tasks, in regular tasks this wait is a no-op that does
97 # not yield a future. See gh-124309.
98 await ok_to_start.wait()
99 # Wait for the previous task to finish, or for delay seconds
100 if previous_failed is not None:
101 with contextlib.suppress(exceptions_mod.TimeoutError):
102 # Use asyncio.wait_for() instead of asyncio.wait() here, so
103 # that if we get cancelled at this point, Event.wait() is also
104 # cancelled, otherwise there will be a "Task destroyed but it is
105 # pending" later.
106 await tasks.wait_for(previous_failed.wait(), delay)
107 # Get the next coroutine to run
108 try:
109 this_index, coro_fn = next(enum_coro_fns)
110 except StopIteration:
111 return
112 # Start task that will run the next coroutine
113 this_failed = locks.Event()
114 next_ok_to_start = locks.Event()
115 next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed))
116 futures.future_add_to_awaited_by(next_task, parent_task)
117 running_tasks.add(next_task)
118 next_task.add_done_callback(task_done)
119 # next_task has been appended to running_tasks so next_task is ok to
120 # start.
121 next_ok_to_start.set()
122 # Prepare place to put this coroutine's exceptions if not won
123 exceptions.append(None)
124 assert len(exceptions) == this_index + 1
126 try:
127 result = await coro_fn()
128 except (SystemExit, KeyboardInterrupt):
129 raise
130 except BaseException as e:
131 exceptions[this_index] = e
132 this_failed.set() # Kickstart the next coroutine
133 else:
134 # Store winner's results
135 nonlocal winner_index, winner_result
136 assert winner_index is None
137 winner_index = this_index
138 winner_result = result
139 # Cancel all other tasks. We take care to not cancel the current
140 # task as well. If we do so, then since there is no `await` after
141 # here and CancelledError are usually thrown at one, we will
142 # encounter a curious corner case where the current task will end
143 # up as done() == True, cancelled() == False, exception() ==
144 # asyncio.CancelledError. This behavior is specified in
145 # https://bugs.python.org/issue30048
146 current_task = tasks.current_task(loop)
147 for t in running_tasks:
148 if t is not current_task:
149 t.cancel()
151 propagate_cancellation_error = None
152 try:
153 ok_to_start = locks.Event()
154 first_task = loop.create_task(run_one_coro(ok_to_start, None))
155 futures.future_add_to_awaited_by(first_task, parent_task)
156 running_tasks.add(first_task)
157 first_task.add_done_callback(task_done)
158 # first_task has been appended to running_tasks so first_task is ok to start.
159 ok_to_start.set()
160 propagate_cancellation_error = None
161 # Make sure no tasks are left running if we leave this function
162 while running_tasks:
163 on_completed_fut = loop.create_future()
164 try:
165 await on_completed_fut
166 except exceptions_mod.CancelledError as ex:
167 propagate_cancellation_error = ex
168 for task in running_tasks:
169 task.cancel(*ex.args)
170 on_completed_fut = None
171 if __debug__ and unhandled_exceptions: 171 ↛ 174line 171 didn't jump to line 174 because the condition on line 171 was never true
172 # If run_one_coro raises an unhandled exception, it's probably a
173 # programming error, and I want to see it.
174 raise ExceptionGroup("staggered race failed", unhandled_exceptions)
175 if propagate_cancellation_error is not None:
176 raise propagate_cancellation_error
177 return winner_result, winner_index, exceptions
178 finally:
179 del exceptions, propagate_cancellation_error, unhandled_exceptions, parent_task