Coverage for Lib/asyncio/staggered.py: 95%

84 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-23 01:21 +0000

1"""Support for running coroutines in parallel with staggered start times.""" 

2 

3__all__ = 'staggered_race', 

4 

5import contextlib 

6 

7from . import events 

8from . import exceptions as exceptions_mod 

9from . import locks 

10from . import tasks 

11from . import futures 

12 

13 

14async def staggered_race(coro_fns, delay, *, loop=None): 

15 """Run coroutines with staggered start times and take the first to finish. 

16 

17 This method takes an iterable of coroutine functions. The first one is 

18 started immediately. From then on, whenever the immediately preceding one 

19 fails (raises an exception), or when *delay* seconds has passed, the next 

20 coroutine is started. This continues until one of the coroutines complete 

21 successfully, in which case all others are cancelled, or until all 

22 coroutines fail. 

23 

24 The coroutines provided should be well-behaved in the following way: 

25 

26 * They should only ``return`` if completed successfully. 

27 

28 * They should always raise an exception if they did not complete 

29 successfully. In particular, if they handle cancellation, they should 

30 probably reraise, like this:: 

31 

32 try: 

33 # do work 

34 except asyncio.CancelledError: 

35 # undo partially completed work 

36 raise 

37 

38 Args: 

39 coro_fns: an iterable of coroutine functions, i.e. callables that 

40 return a coroutine object when called. Use ``functools.partial`` or 

41 lambdas to pass arguments. 

42 

43 delay: amount of time, in seconds, between starting coroutines. If 

44 ``None``, the coroutines will run sequentially. 

45 

46 loop: the event loop to use. 

47 

48 Returns: 

49 tuple *(winner_result, winner_index, exceptions)* where 

50 

51 - *winner_result*: the result of the winning coroutine, or ``None`` 

52 if no coroutines won. 

53 

54 - *winner_index*: the index of the winning coroutine in 

55 ``coro_fns``, or ``None`` if no coroutines won. If the winning 

56 coroutine may return None on success, *winner_index* can be used 

57 to definitively determine whether any coroutine won. 

58 

59 - *exceptions*: list of exceptions returned by the coroutines. 

60 ``len(exceptions)`` is equal to the number of coroutines actually 

61 started, and the order is the same as in ``coro_fns``. The winning 

62 coroutine's entry is ``None``. 

63 

64 """ 

65 loop = loop or events.get_running_loop() 

66 parent_task = tasks.current_task(loop) 

67 enum_coro_fns = enumerate(coro_fns) 

68 winner_result = None 

69 winner_index = None 

70 unhandled_exceptions = [] 

71 exceptions = [] 

72 running_tasks = set() 

73 on_completed_fut = None 

74 

75 def task_done(task): 

76 running_tasks.discard(task) 

77 futures.future_discard_from_awaited_by(task, parent_task) 

78 if ( 

79 on_completed_fut is not None 

80 and not on_completed_fut.done() 

81 and not running_tasks 

82 ): 

83 on_completed_fut.set_result(None) 

84 

85 if task.cancelled(): 

86 return 

87 

88 exc = task.exception() 

89 if exc is None: 89 ↛ 91line 89 didn't jump to line 91 because the condition on line 89 was always true

90 return 

91 unhandled_exceptions.append(exc) 

92 

93 async def run_one_coro(ok_to_start, previous_failed) -> None: 

94 # in eager tasks this waits for the calling task to append this task 

95 # to running_tasks, in regular tasks this wait is a no-op that does 

96 # not yield a future. See gh-124309. 

97 await ok_to_start.wait() 

98 # Wait for the previous task to finish, or for delay seconds 

99 if previous_failed is not None: 

100 with contextlib.suppress(exceptions_mod.TimeoutError): 

101 # Use asyncio.wait_for() instead of asyncio.wait() here, so 

102 # that if we get cancelled at this point, Event.wait() is also 

103 # cancelled, otherwise there will be a "Task destroyed but it is 

104 # pending" later. 

105 await tasks.wait_for(previous_failed.wait(), delay) 

106 # Get the next coroutine to run 

107 try: 

108 this_index, coro_fn = next(enum_coro_fns) 

109 except StopIteration: 

110 return 

111 # Start task that will run the next coroutine 

112 this_failed = locks.Event() 

113 next_ok_to_start = locks.Event() 

114 next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed)) 

115 futures.future_add_to_awaited_by(next_task, parent_task) 

116 running_tasks.add(next_task) 

117 next_task.add_done_callback(task_done) 

118 # next_task has been appended to running_tasks so next_task is ok to 

119 # start. 

120 next_ok_to_start.set() 

121 # Prepare place to put this coroutine's exceptions if not won 

122 exceptions.append(None) 

123 assert len(exceptions) == this_index + 1 

124 

125 try: 

126 result = await coro_fn() 

127 except (SystemExit, KeyboardInterrupt): 

128 raise 

129 except BaseException as e: 

130 exceptions[this_index] = e 

131 this_failed.set() # Kickstart the next coroutine 

132 else: 

133 # Store winner's results 

134 nonlocal winner_index, winner_result 

135 assert winner_index is None 

136 winner_index = this_index 

137 winner_result = result 

138 # Cancel all other tasks. We take care to not cancel the current 

139 # task as well. If we do so, then since there is no `await` after 

140 # here and CancelledError are usually thrown at one, we will 

141 # encounter a curious corner case where the current task will end 

142 # up as done() == True, cancelled() == False, exception() == 

143 # asyncio.CancelledError. This behavior is specified in 

144 # https://bugs.python.org/issue30048 

145 current_task = tasks.current_task(loop) 

146 for t in running_tasks: 

147 if t is not current_task: 

148 t.cancel() 

149 

150 propagate_cancellation_error = None 

151 try: 

152 ok_to_start = locks.Event() 

153 first_task = loop.create_task(run_one_coro(ok_to_start, None)) 

154 futures.future_add_to_awaited_by(first_task, parent_task) 

155 running_tasks.add(first_task) 

156 first_task.add_done_callback(task_done) 

157 # first_task has been appended to running_tasks so first_task is ok to start. 

158 ok_to_start.set() 

159 propagate_cancellation_error = None 

160 # Make sure no tasks are left running if we leave this function 

161 while running_tasks: 

162 on_completed_fut = loop.create_future() 

163 try: 

164 await on_completed_fut 

165 except exceptions_mod.CancelledError as ex: 

166 propagate_cancellation_error = ex 

167 for task in running_tasks: 

168 task.cancel(*ex.args) 

169 on_completed_fut = None 

170 if __debug__ and unhandled_exceptions: 170 ↛ 173line 170 didn't jump to line 173 because the condition on line 170 was never true

171 # If run_one_coro raises an unhandled exception, it's probably a 

172 # programming error, and I want to see it. 

173 raise ExceptionGroup("staggered race failed", unhandled_exceptions) 

174 if propagate_cancellation_error is not None: 

175 raise propagate_cancellation_error 

176 return winner_result, winner_index, exceptions 

177 finally: 

178 del exceptions, propagate_cancellation_error, unhandled_exceptions, parent_task