Coverage for Lib/asyncio/staggered.py: 95%

84 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-15 02:02 +0000

1"""Support for running coroutines in parallel with staggered start times.""" 

2 

3__all__ = 'staggered_race', 

4 

5import contextlib 

6 

7from . import events 

8from . import exceptions as exceptions_mod 

9from . import locks 

10from . import tasks 

11from . import futures 

12 

13 

14async def staggered_race(coro_fns, delay, *, loop=None): 

15 """Run coroutines with staggered start times and take the first to finish. 

16 

17 This method takes an iterable of coroutine functions. The first one is 

18 started immediately. From then on, whenever the immediately preceding one 

19 fails (raises an exception), or when *delay* seconds has passed, the next 

20 coroutine is started. This continues until one of the coroutines complete 

21 successfully, in which case all others are cancelled, or until all 

22 coroutines fail. 

23 

24 The coroutines provided should be well-behaved in the following way: 

25 

26 * They should only ``return`` if completed successfully. 

27 

28 * They should always raise an exception if they did not complete 

29 successfully. In particular, if they handle cancellation, they should 

30 probably reraise, like this:: 

31 

32 try: 

33 # do work 

34 except asyncio.CancelledError: 

35 # undo partially completed work 

36 raise 

37 

38 Args: 

39 coro_fns: an iterable of coroutine functions, i.e. callables that 

40 return a coroutine object when called. Use ``functools.partial`` or 

41 lambdas to pass arguments. 

42 

43 delay: amount of time, in seconds, between starting coroutines. If 

44 ``None``, the coroutines will run sequentially. 

45 

46 loop: the event loop to use. 

47 

48 Returns: 

49 tuple *(winner_result, winner_index, exceptions)* where 

50 

51 - *winner_result*: the result of the winning coroutine, or ``None`` 

52 if no coroutines won. 

53 

54 - *winner_index*: the index of the winning coroutine in 

55 ``coro_fns``, or ``None`` if no coroutines won. If the winning 

56 coroutine may return None on success, *winner_index* can be used 

57 to definitively determine whether any coroutine won. 

58 

59 - *exceptions*: list of exceptions returned by the coroutines. 

60 ``len(exceptions)`` is equal to the number of coroutines actually 

61 started, and the order is the same as in ``coro_fns``. The winning 

62 coroutine's entry is ``None``. 

63 

64 """ 

65 # TODO: when we have aiter() and anext(), allow async iterables in coro_fns. 

66 loop = loop or events.get_running_loop() 

67 parent_task = tasks.current_task(loop) 

68 enum_coro_fns = enumerate(coro_fns) 

69 winner_result = None 

70 winner_index = None 

71 unhandled_exceptions = [] 

72 exceptions = [] 

73 running_tasks = set() 

74 on_completed_fut = None 

75 

76 def task_done(task): 

77 running_tasks.discard(task) 

78 futures.future_discard_from_awaited_by(task, parent_task) 

79 if ( 

80 on_completed_fut is not None 

81 and not on_completed_fut.done() 

82 and not running_tasks 

83 ): 

84 on_completed_fut.set_result(None) 

85 

86 if task.cancelled(): 

87 return 

88 

89 exc = task.exception() 

90 if exc is None: 90 ↛ 92line 90 didn't jump to line 92 because the condition on line 90 was always true

91 return 

92 unhandled_exceptions.append(exc) 

93 

94 async def run_one_coro(ok_to_start, previous_failed) -> None: 

95 # in eager tasks this waits for the calling task to append this task 

96 # to running_tasks, in regular tasks this wait is a no-op that does 

97 # not yield a future. See gh-124309. 

98 await ok_to_start.wait() 

99 # Wait for the previous task to finish, or for delay seconds 

100 if previous_failed is not None: 

101 with contextlib.suppress(exceptions_mod.TimeoutError): 

102 # Use asyncio.wait_for() instead of asyncio.wait() here, so 

103 # that if we get cancelled at this point, Event.wait() is also 

104 # cancelled, otherwise there will be a "Task destroyed but it is 

105 # pending" later. 

106 await tasks.wait_for(previous_failed.wait(), delay) 

107 # Get the next coroutine to run 

108 try: 

109 this_index, coro_fn = next(enum_coro_fns) 

110 except StopIteration: 

111 return 

112 # Start task that will run the next coroutine 

113 this_failed = locks.Event() 

114 next_ok_to_start = locks.Event() 

115 next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed)) 

116 futures.future_add_to_awaited_by(next_task, parent_task) 

117 running_tasks.add(next_task) 

118 next_task.add_done_callback(task_done) 

119 # next_task has been appended to running_tasks so next_task is ok to 

120 # start. 

121 next_ok_to_start.set() 

122 # Prepare place to put this coroutine's exceptions if not won 

123 exceptions.append(None) 

124 assert len(exceptions) == this_index + 1 

125 

126 try: 

127 result = await coro_fn() 

128 except (SystemExit, KeyboardInterrupt): 

129 raise 

130 except BaseException as e: 

131 exceptions[this_index] = e 

132 this_failed.set() # Kickstart the next coroutine 

133 else: 

134 # Store winner's results 

135 nonlocal winner_index, winner_result 

136 assert winner_index is None 

137 winner_index = this_index 

138 winner_result = result 

139 # Cancel all other tasks. We take care to not cancel the current 

140 # task as well. If we do so, then since there is no `await` after 

141 # here and CancelledError are usually thrown at one, we will 

142 # encounter a curious corner case where the current task will end 

143 # up as done() == True, cancelled() == False, exception() == 

144 # asyncio.CancelledError. This behavior is specified in 

145 # https://bugs.python.org/issue30048 

146 current_task = tasks.current_task(loop) 

147 for t in running_tasks: 

148 if t is not current_task: 

149 t.cancel() 

150 

151 propagate_cancellation_error = None 

152 try: 

153 ok_to_start = locks.Event() 

154 first_task = loop.create_task(run_one_coro(ok_to_start, None)) 

155 futures.future_add_to_awaited_by(first_task, parent_task) 

156 running_tasks.add(first_task) 

157 first_task.add_done_callback(task_done) 

158 # first_task has been appended to running_tasks so first_task is ok to start. 

159 ok_to_start.set() 

160 propagate_cancellation_error = None 

161 # Make sure no tasks are left running if we leave this function 

162 while running_tasks: 

163 on_completed_fut = loop.create_future() 

164 try: 

165 await on_completed_fut 

166 except exceptions_mod.CancelledError as ex: 

167 propagate_cancellation_error = ex 

168 for task in running_tasks: 

169 task.cancel(*ex.args) 

170 on_completed_fut = None 

171 if __debug__ and unhandled_exceptions: 171 ↛ 174line 171 didn't jump to line 174 because the condition on line 171 was never true

172 # If run_one_coro raises an unhandled exception, it's probably a 

173 # programming error, and I want to see it. 

174 raise ExceptionGroup("staggered race failed", unhandled_exceptions) 

175 if propagate_cancellation_error is not None: 

176 raise propagate_cancellation_error 

177 return winner_result, winner_index, exceptions 

178 finally: 

179 del exceptions, propagate_cancellation_error, unhandled_exceptions, parent_task