Coverage for Lib/asyncio/taskgroups.py: 96%

138 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-15 02:02 +0000

1# Adapted with permission from the EdgeDB project; 

2# license: PSFL. 

3 

4 

5__all__ = ("TaskGroup",) 

6 

7from . import events 

8from . import exceptions 

9from . import futures 

10from . import tasks 

11 

12 

13class TaskGroup: 

14 """Asynchronous context manager for managing groups of tasks. 

15 

16 Example use: 

17 

18 async with asyncio.TaskGroup() as group: 

19 task1 = group.create_task(some_coroutine(...)) 

20 task2 = group.create_task(other_coroutine(...)) 

21 print("Both tasks have completed now.") 

22 

23 All tasks are awaited when the context manager exits. 

24 

25 Any exceptions other than `asyncio.CancelledError` raised within 

26 a task will cancel all remaining tasks and wait for them to exit. 

27 The exceptions are then combined and raised as an `ExceptionGroup`. 

28 """ 

29 def __init__(self): 

30 self._entered = False 

31 self._exiting = False 

32 self._aborting = False 

33 self._loop = None 

34 self._parent_task = None 

35 self._parent_cancel_requested = False 

36 self._tasks = set() 

37 self._errors = [] 

38 self._base_error = None 

39 self._on_completed_fut = None 

40 

41 def __repr__(self): 

42 info = [''] 

43 if self._tasks: 

44 info.append(f'tasks={len(self._tasks)}') 

45 if self._errors: 

46 info.append(f'errors={len(self._errors)}') 

47 if self._aborting: 

48 info.append('cancelling') 

49 elif self._entered: 

50 info.append('entered') 

51 

52 info_str = ' '.join(info) 

53 return f'<TaskGroup{info_str}>' 

54 

55 async def __aenter__(self): 

56 if self._entered: 

57 raise RuntimeError( 

58 f"TaskGroup {self!r} has already been entered") 

59 if self._loop is None: 59 ↛ 61line 59 didn't jump to line 61 because the condition on line 59 was always true

60 self._loop = events.get_running_loop() 

61 self._parent_task = tasks.current_task(self._loop) 

62 if self._parent_task is None: 

63 raise RuntimeError( 

64 f'TaskGroup {self!r} cannot determine the parent task') 

65 self._entered = True 

66 

67 return self 

68 

69 async def __aexit__(self, et, exc, tb): 

70 tb = None 

71 try: 

72 return await self._aexit(et, exc) 

73 finally: 

74 # Exceptions are heavy objects that can have object 

75 # cycles (bad for GC); let's not keep a reference to 

76 # a bunch of them. It would be nicer to use a try/finally 

77 # in __aexit__ directly but that introduced some diff noise 

78 self._parent_task = None 

79 self._errors = None 

80 self._base_error = None 

81 exc = None 

82 

83 async def _aexit(self, et, exc): 

84 self._exiting = True 

85 

86 if (exc is not None and 

87 self._is_base_error(exc) and 

88 self._base_error is None): 

89 self._base_error = exc 

90 

91 if et is not None and issubclass(et, exceptions.CancelledError): 

92 propagate_cancellation_error = exc 

93 else: 

94 propagate_cancellation_error = None 

95 

96 if et is not None: 

97 if not self._aborting: 

98 # Our parent task is being cancelled: 

99 # 

100 # async with TaskGroup() as g: 

101 # g.create_task(...) 

102 # await ... # <- CancelledError 

103 # 

104 # or there's an exception in "async with": 

105 # 

106 # async with TaskGroup() as g: 

107 # g.create_task(...) 

108 # 1 / 0 

109 # 

110 self._abort() 

111 

112 # We use while-loop here because "self._on_completed_fut" 

113 # can be cancelled multiple times if our parent task 

114 # is being cancelled repeatedly (or even once, when 

115 # our own cancellation is already in progress) 

116 while self._tasks: 

117 if self._on_completed_fut is None: 117 ↛ 120line 117 didn't jump to line 120 because the condition on line 117 was always true

118 self._on_completed_fut = self._loop.create_future() 

119 

120 try: 

121 await self._on_completed_fut 

122 except exceptions.CancelledError as ex: 

123 if not self._aborting: 

124 # Our parent task is being cancelled: 

125 # 

126 # async def wrapper(): 

127 # async with TaskGroup() as g: 

128 # g.create_task(foo) 

129 # 

130 # "wrapper" is being cancelled while "foo" is 

131 # still running. 

132 propagate_cancellation_error = ex 

133 self._abort() 

134 

135 self._on_completed_fut = None 

136 

137 assert not self._tasks 

138 

139 if self._base_error is not None: 

140 try: 

141 raise self._base_error 

142 finally: 

143 exc = None 

144 

145 if self._parent_cancel_requested: 

146 # If this flag is set we *must* call uncancel(). 

147 if self._parent_task.uncancel() == 0: 

148 # If there are no pending cancellations left, 

149 # don't propagate CancelledError. 

150 propagate_cancellation_error = None 

151 

152 # Propagate CancelledError if there is one, except if there 

153 # are other errors -- those have priority. 

154 try: 

155 if propagate_cancellation_error is not None and not self._errors: 

156 try: 

157 raise propagate_cancellation_error 

158 finally: 

159 exc = None 

160 finally: 

161 propagate_cancellation_error = None 

162 

163 if et is not None and not issubclass(et, exceptions.CancelledError): 

164 self._errors.append(exc) 

165 

166 if self._errors: 

167 # If the parent task is being cancelled from the outside 

168 # of the taskgroup, un-cancel and re-cancel the parent task, 

169 # which will keep the cancel count stable. 

170 if self._parent_task.cancelling(): 

171 self._parent_task.uncancel() 

172 self._parent_task.cancel() 

173 try: 

174 raise BaseExceptionGroup( 

175 'unhandled errors in a TaskGroup', 

176 self._errors, 

177 ) from None 

178 finally: 

179 exc = None 

180 

181 

182 def create_task(self, coro, *, name=None, context=None): 

183 """Create a new task in this group and return it. 

184 

185 Similar to `asyncio.create_task`. 

186 """ 

187 if not self._entered: 

188 coro.close() 

189 raise RuntimeError(f"TaskGroup {self!r} has not been entered") 

190 if self._exiting and not self._tasks: 

191 coro.close() 

192 raise RuntimeError(f"TaskGroup {self!r} is finished") 

193 if self._aborting: 

194 coro.close() 

195 raise RuntimeError(f"TaskGroup {self!r} is shutting down") 

196 if context is None: 

197 task = self._loop.create_task(coro, name=name) 

198 else: 

199 task = self._loop.create_task(coro, name=name, context=context) 

200 

201 futures.future_add_to_awaited_by(task, self._parent_task) 

202 

203 # Always schedule the done callback even if the task is 

204 # already done (e.g. if the coro was able to complete eagerly), 

205 # otherwise if the task completes with an exception then it will cancel 

206 # the current task too early. gh-128550, gh-128588 

207 self._tasks.add(task) 

208 task.add_done_callback(self._on_task_done) 

209 try: 

210 return task 

211 finally: 

212 # gh-128552: prevent a refcycle of 

213 # task.exception().__traceback__->TaskGroup.create_task->task 

214 del task 

215 

216 # Since Python 3.8 Tasks propagate all exceptions correctly, 

217 # except for KeyboardInterrupt and SystemExit which are 

218 # still considered special. 

219 

220 def _is_base_error(self, exc: BaseException) -> bool: 

221 assert isinstance(exc, BaseException) 

222 return isinstance(exc, (SystemExit, KeyboardInterrupt)) 

223 

224 def _abort(self): 

225 self._aborting = True 

226 

227 for t in self._tasks: 

228 if not t.done(): 

229 t.cancel() 

230 

231 def _on_task_done(self, task): 

232 self._tasks.discard(task) 

233 

234 futures.future_discard_from_awaited_by(task, self._parent_task) 

235 

236 if self._on_completed_fut is not None and not self._tasks: 

237 if not self._on_completed_fut.done(): 237 ↛ 240line 237 didn't jump to line 240 because the condition on line 237 was always true

238 self._on_completed_fut.set_result(True) 

239 

240 if task.cancelled(): 

241 return 

242 

243 exc = task.exception() 

244 if exc is None: 

245 return 

246 

247 self._errors.append(exc) 

248 if self._is_base_error(exc) and self._base_error is None: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true

249 self._base_error = exc 

250 

251 if self._parent_task.done(): 251 ↛ 254line 251 didn't jump to line 254 because the condition on line 251 was never true

252 # Not sure if this case is possible, but we want to handle 

253 # it anyways. 

254 self._loop.call_exception_handler({ 

255 'message': f'Task {task!r} has errored out but its parent ' 

256 f'task {self._parent_task} is already completed', 

257 'exception': exc, 

258 'task': task, 

259 }) 

260 return 

261 

262 if not self._aborting and not self._parent_cancel_requested: 

263 # If parent task *is not* being cancelled, it means that we want 

264 # to manually cancel it to abort whatever is being run right now 

265 # in the TaskGroup. But we want to mark parent task as 

266 # "not cancelled" later in __aexit__. Example situation that 

267 # we need to handle: 

268 # 

269 # async def foo(): 

270 # try: 

271 # async with TaskGroup() as g: 

272 # g.create_task(crash_soon()) 

273 # await something # <- this needs to be canceled 

274 # # by the TaskGroup, e.g. 

275 # # foo() needs to be cancelled 

276 # except Exception: 

277 # # Ignore any exceptions raised in the TaskGroup 

278 # pass 

279 # await something_else # this line has to be called 

280 # # after TaskGroup is finished. 

281 self._abort() 

282 self._parent_cancel_requested = True 

283 self._parent_task.cancel()