Coverage for Lib / asyncio / taskgroups.py: 96%

151 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 02:39 +0000

1# Adapted with permission from the EdgeDB project; 

2# license: PSFL. 

3 

4 

5__all__ = ("TaskGroup",) 

6 

7from . import events 

8from . import exceptions 

9from . import futures 

10from . import tasks 

11 

12 

13class TaskGroup: 

14 """Asynchronous context manager for managing groups of tasks. 

15 

16 Example use: 

17 

18 async with asyncio.TaskGroup() as group: 

19 task1 = group.create_task(some_coroutine(...)) 

20 task2 = group.create_task(other_coroutine(...)) 

21 print("Both tasks have completed now.") 

22 

23 All tasks are awaited when the context manager exits. 

24 

25 Any exceptions other than `asyncio.CancelledError` raised within 

26 a task will cancel all remaining tasks and wait for them to exit. 

27 The exceptions are then combined and raised as an `ExceptionGroup`. 

28 """ 

29 def __init__(self): 

30 self._entered = False 

31 self._exiting = False 

32 self._aborting = False 

33 self._loop = None 

34 self._parent_task = None 

35 self._parent_cancel_requested = False 

36 self._tasks = set() 

37 self._errors = [] 

38 self._base_error = None 

39 self._on_completed_fut = None 

40 self._cancel_on_enter = False 

41 

42 def __repr__(self): 

43 info = [''] 

44 if self._tasks: 

45 info.append(f'tasks={len(self._tasks)}') 

46 if self._errors: 

47 info.append(f'errors={len(self._errors)}') 

48 if self._aborting: 

49 info.append('cancelling') 

50 elif self._entered: 

51 info.append('entered') 

52 

53 info_str = ' '.join(info) 

54 return f'<TaskGroup{info_str}>' 

55 

56 async def __aenter__(self): 

57 if self._entered: 

58 raise RuntimeError( 

59 f"TaskGroup {self!r} has already been entered") 

60 if self._loop is None: 60 ↛ 62line 60 didn't jump to line 62 because the condition on line 60 was always true

61 self._loop = events.get_running_loop() 

62 self._parent_task = tasks.current_task(self._loop) 

63 if self._parent_task is None: 

64 raise RuntimeError( 

65 f'TaskGroup {self!r} cannot determine the parent task') 

66 self._entered = True 

67 if self._cancel_on_enter: 

68 self.cancel() 

69 

70 return self 

71 

72 async def __aexit__(self, et, exc, tb): 

73 tb = None 

74 try: 

75 return await self._aexit(et, exc) 

76 finally: 

77 # Exceptions are heavy objects that can have object 

78 # cycles (bad for GC); let's not keep a reference to 

79 # a bunch of them. It would be nicer to use a try/finally 

80 # in __aexit__ directly but that introduced some diff noise 

81 self._parent_task = None 

82 self._errors = None 

83 self._base_error = None 

84 exc = None 

85 

86 async def _aexit(self, et, exc): 

87 self._exiting = True 

88 

89 if (exc is not None and 

90 self._is_base_error(exc) and 

91 self._base_error is None): 

92 self._base_error = exc 

93 

94 if et is not None and issubclass(et, exceptions.CancelledError): 

95 propagate_cancellation_error = exc 

96 else: 

97 propagate_cancellation_error = None 

98 

99 if et is not None: 

100 if not self._aborting: 

101 # Our parent task is being cancelled: 

102 # 

103 # async with TaskGroup() as g: 

104 # g.create_task(...) 

105 # await ... # <- CancelledError 

106 # 

107 # or there's an exception in "async with": 

108 # 

109 # async with TaskGroup() as g: 

110 # g.create_task(...) 

111 # 1 / 0 

112 # 

113 self._abort() 

114 

115 # We use while-loop here because "self._on_completed_fut" 

116 # can be cancelled multiple times if our parent task 

117 # is being cancelled repeatedly (or even once, when 

118 # our own cancellation is already in progress) 

119 while self._tasks: 

120 if self._on_completed_fut is None: 120 ↛ 123line 120 didn't jump to line 123 because the condition on line 120 was always true

121 self._on_completed_fut = self._loop.create_future() 

122 

123 try: 

124 await self._on_completed_fut 

125 except exceptions.CancelledError as ex: 

126 if not self._aborting: 

127 # Our parent task is being cancelled: 

128 # 

129 # async def wrapper(): 

130 # async with TaskGroup() as g: 

131 # g.create_task(foo) 

132 # 

133 # "wrapper" is being cancelled while "foo" is 

134 # still running. 

135 propagate_cancellation_error = ex 

136 self._abort() 

137 

138 self._on_completed_fut = None 

139 

140 assert not self._tasks 

141 

142 if self._base_error is not None: 

143 try: 

144 raise self._base_error 

145 finally: 

146 exc = None 

147 

148 if self._parent_cancel_requested: 

149 # If this flag is set we *must* call uncancel(). 

150 if self._parent_task.uncancel() == 0: 

151 # If there are no pending cancellations left, 

152 # don't propagate CancelledError. 

153 propagate_cancellation_error = None 

154 

155 # Propagate CancelledError if there is one, except if there 

156 # are other errors -- those have priority. 

157 try: 

158 if propagate_cancellation_error is not None and not self._errors: 

159 try: 

160 raise propagate_cancellation_error 

161 finally: 

162 exc = None 

163 finally: 

164 propagate_cancellation_error = None 

165 

166 if et is not None and not issubclass(et, exceptions.CancelledError): 

167 self._errors.append(exc) 

168 

169 if self._errors: 

170 # If the parent task is being cancelled from the outside 

171 # of the taskgroup, un-cancel and re-cancel the parent task, 

172 # which will keep the cancel count stable. 

173 if self._parent_task.cancelling(): 

174 self._parent_task.uncancel() 

175 self._parent_task.cancel() 

176 try: 

177 raise BaseExceptionGroup( 

178 'unhandled errors in a TaskGroup', 

179 self._errors, 

180 ) from None 

181 finally: 

182 exc = None 

183 

184 # Suppress any remaining exception (exceptions deserving to be raised 

185 # were raised above). 

186 return True 

187 

188 def create_task(self, coro, **kwargs): 

189 """Create a new task in this group and return it. 

190 

191 Similar to `asyncio.create_task`. 

192 """ 

193 if not self._entered: 

194 coro.close() 

195 raise RuntimeError(f"TaskGroup {self!r} has not been entered") 

196 if self._exiting and not self._tasks: 

197 coro.close() 

198 raise RuntimeError(f"TaskGroup {self!r} is finished") 

199 if self._aborting: 

200 coro.close() 

201 raise RuntimeError(f"TaskGroup {self!r} is shutting down") 

202 task = self._loop.create_task(coro, **kwargs) 

203 

204 futures.future_add_to_awaited_by(task, self._parent_task) 

205 

206 # Always schedule the done callback even if the task is 

207 # already done (e.g. if the coro was able to complete eagerly), 

208 # otherwise if the task completes with an exception then it will cancel 

209 # the current task too early. gh-128550, gh-128588 

210 self._tasks.add(task) 

211 task.add_done_callback(self._on_task_done) 

212 try: 

213 return task 

214 finally: 

215 # gh-128552: prevent a refcycle of 

216 # task.exception().__traceback__->TaskGroup.create_task->task 

217 del task 

218 

219 # Since Python 3.8 Tasks propagate all exceptions correctly, 

220 # except for KeyboardInterrupt and SystemExit which are 

221 # still considered special. 

222 

223 def _is_base_error(self, exc: BaseException) -> bool: 

224 assert isinstance(exc, BaseException) 

225 return isinstance(exc, (SystemExit, KeyboardInterrupt)) 

226 

227 def _abort(self): 

228 self._aborting = True 

229 

230 for t in self._tasks: 

231 if not t.done(): 

232 t.cancel() 

233 

234 def _on_task_done(self, task): 

235 self._tasks.discard(task) 

236 

237 futures.future_discard_from_awaited_by(task, self._parent_task) 

238 

239 if self._on_completed_fut is not None and not self._tasks: 

240 if not self._on_completed_fut.done(): 240 ↛ 243line 240 didn't jump to line 243 because the condition on line 240 was always true

241 self._on_completed_fut.set_result(True) 

242 

243 if task.cancelled(): 

244 return 

245 

246 exc = task.exception() 

247 if exc is None: 

248 return 

249 

250 self._errors.append(exc) 

251 if self._is_base_error(exc) and self._base_error is None: 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true

252 self._base_error = exc 

253 

254 if self._parent_task.done(): 254 ↛ 257line 254 didn't jump to line 257 because the condition on line 254 was never true

255 # Not sure if this case is possible, but we want to handle 

256 # it anyways. 

257 self._loop.call_exception_handler({ 

258 'message': f'Task {task!r} has errored out but its parent ' 

259 f'task {self._parent_task} is already completed', 

260 'exception': exc, 

261 'task': task, 

262 }) 

263 return 

264 

265 if not self._aborting and not self._parent_cancel_requested: 

266 # If parent task *is not* being cancelled, it means that we want 

267 # to manually cancel it to abort whatever is being run right now 

268 # in the TaskGroup. But we want to mark parent task as 

269 # "not cancelled" later in __aexit__. Example situation that 

270 # we need to handle: 

271 # 

272 # async def foo(): 

273 # try: 

274 # async with TaskGroup() as g: 

275 # g.create_task(crash_soon()) 

276 # await something # <- this needs to be canceled 

277 # # by the TaskGroup, e.g. 

278 # # foo() needs to be cancelled 

279 # except Exception: 

280 # # Ignore any exceptions raised in the TaskGroup 

281 # pass 

282 # await something_else # this line has to be called 

283 # # after TaskGroup is finished. 

284 self._abort() 

285 self._parent_cancel_requested = True 

286 self._parent_task.cancel() 

287 

288 def cancel(self): 

289 """Cancel the task group 

290 

291 `cancel()` will be called on any tasks in the group that aren't yet 

292 done, as well as the parent (body) of the group. This will cause the 

293 task group context manager to exit *without* `asyncio.CancelledError` 

294 being raised. 

295 

296 If `cancel()` is called before entering the task group, the group will be 

297 cancelled upon entry. This is useful for patterns where one piece of 

298 code passes an unused TaskGroup instance to another in order to have 

299 the ability to cancel anything run within the group. 

300 

301 `cancel()` is idempotent and may be called after the task group has 

302 already exited. 

303 """ 

304 if not self._entered: 

305 self._cancel_on_enter = True 

306 return 

307 if self._exiting and not self._tasks: 

308 return 

309 if not self._aborting: 

310 self._abort() 

311 if self._parent_task and not self._parent_cancel_requested: 311 ↛ exitline 311 didn't return from function 'cancel' because the condition on line 311 was always true

312 self._parent_cancel_requested = True 

313 self._parent_task.cancel()