Coverage for Lib / asyncio / taskgroups.py: 96%
151 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 02:39 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 02:39 +0000
1# Adapted with permission from the EdgeDB project;
2# license: PSFL.
5__all__ = ("TaskGroup",)
7from . import events
8from . import exceptions
9from . import futures
10from . import tasks
13class TaskGroup:
14 """Asynchronous context manager for managing groups of tasks.
16 Example use:
18 async with asyncio.TaskGroup() as group:
19 task1 = group.create_task(some_coroutine(...))
20 task2 = group.create_task(other_coroutine(...))
21 print("Both tasks have completed now.")
23 All tasks are awaited when the context manager exits.
25 Any exceptions other than `asyncio.CancelledError` raised within
26 a task will cancel all remaining tasks and wait for them to exit.
27 The exceptions are then combined and raised as an `ExceptionGroup`.
28 """
29 def __init__(self):
30 self._entered = False
31 self._exiting = False
32 self._aborting = False
33 self._loop = None
34 self._parent_task = None
35 self._parent_cancel_requested = False
36 self._tasks = set()
37 self._errors = []
38 self._base_error = None
39 self._on_completed_fut = None
40 self._cancel_on_enter = False
42 def __repr__(self):
43 info = ['']
44 if self._tasks:
45 info.append(f'tasks={len(self._tasks)}')
46 if self._errors:
47 info.append(f'errors={len(self._errors)}')
48 if self._aborting:
49 info.append('cancelling')
50 elif self._entered:
51 info.append('entered')
53 info_str = ' '.join(info)
54 return f'<TaskGroup{info_str}>'
56 async def __aenter__(self):
57 if self._entered:
58 raise RuntimeError(
59 f"TaskGroup {self!r} has already been entered")
60 if self._loop is None: 60 ↛ 62line 60 didn't jump to line 62 because the condition on line 60 was always true
61 self._loop = events.get_running_loop()
62 self._parent_task = tasks.current_task(self._loop)
63 if self._parent_task is None:
64 raise RuntimeError(
65 f'TaskGroup {self!r} cannot determine the parent task')
66 self._entered = True
67 if self._cancel_on_enter:
68 self.cancel()
70 return self
72 async def __aexit__(self, et, exc, tb):
73 tb = None
74 try:
75 return await self._aexit(et, exc)
76 finally:
77 # Exceptions are heavy objects that can have object
78 # cycles (bad for GC); let's not keep a reference to
79 # a bunch of them. It would be nicer to use a try/finally
80 # in __aexit__ directly but that introduced some diff noise
81 self._parent_task = None
82 self._errors = None
83 self._base_error = None
84 exc = None
86 async def _aexit(self, et, exc):
87 self._exiting = True
89 if (exc is not None and
90 self._is_base_error(exc) and
91 self._base_error is None):
92 self._base_error = exc
94 if et is not None and issubclass(et, exceptions.CancelledError):
95 propagate_cancellation_error = exc
96 else:
97 propagate_cancellation_error = None
99 if et is not None:
100 if not self._aborting:
101 # Our parent task is being cancelled:
102 #
103 # async with TaskGroup() as g:
104 # g.create_task(...)
105 # await ... # <- CancelledError
106 #
107 # or there's an exception in "async with":
108 #
109 # async with TaskGroup() as g:
110 # g.create_task(...)
111 # 1 / 0
112 #
113 self._abort()
115 # We use while-loop here because "self._on_completed_fut"
116 # can be cancelled multiple times if our parent task
117 # is being cancelled repeatedly (or even once, when
118 # our own cancellation is already in progress)
119 while self._tasks:
120 if self._on_completed_fut is None: 120 ↛ 123line 120 didn't jump to line 123 because the condition on line 120 was always true
121 self._on_completed_fut = self._loop.create_future()
123 try:
124 await self._on_completed_fut
125 except exceptions.CancelledError as ex:
126 if not self._aborting:
127 # Our parent task is being cancelled:
128 #
129 # async def wrapper():
130 # async with TaskGroup() as g:
131 # g.create_task(foo)
132 #
133 # "wrapper" is being cancelled while "foo" is
134 # still running.
135 propagate_cancellation_error = ex
136 self._abort()
138 self._on_completed_fut = None
140 assert not self._tasks
142 if self._base_error is not None:
143 try:
144 raise self._base_error
145 finally:
146 exc = None
148 if self._parent_cancel_requested:
149 # If this flag is set we *must* call uncancel().
150 if self._parent_task.uncancel() == 0:
151 # If there are no pending cancellations left,
152 # don't propagate CancelledError.
153 propagate_cancellation_error = None
155 # Propagate CancelledError if there is one, except if there
156 # are other errors -- those have priority.
157 try:
158 if propagate_cancellation_error is not None and not self._errors:
159 try:
160 raise propagate_cancellation_error
161 finally:
162 exc = None
163 finally:
164 propagate_cancellation_error = None
166 if et is not None and not issubclass(et, exceptions.CancelledError):
167 self._errors.append(exc)
169 if self._errors:
170 # If the parent task is being cancelled from the outside
171 # of the taskgroup, un-cancel and re-cancel the parent task,
172 # which will keep the cancel count stable.
173 if self._parent_task.cancelling():
174 self._parent_task.uncancel()
175 self._parent_task.cancel()
176 try:
177 raise BaseExceptionGroup(
178 'unhandled errors in a TaskGroup',
179 self._errors,
180 ) from None
181 finally:
182 exc = None
184 # Suppress any remaining exception (exceptions deserving to be raised
185 # were raised above).
186 return True
188 def create_task(self, coro, **kwargs):
189 """Create a new task in this group and return it.
191 Similar to `asyncio.create_task`.
192 """
193 if not self._entered:
194 coro.close()
195 raise RuntimeError(f"TaskGroup {self!r} has not been entered")
196 if self._exiting and not self._tasks:
197 coro.close()
198 raise RuntimeError(f"TaskGroup {self!r} is finished")
199 if self._aborting:
200 coro.close()
201 raise RuntimeError(f"TaskGroup {self!r} is shutting down")
202 task = self._loop.create_task(coro, **kwargs)
204 futures.future_add_to_awaited_by(task, self._parent_task)
206 # Always schedule the done callback even if the task is
207 # already done (e.g. if the coro was able to complete eagerly),
208 # otherwise if the task completes with an exception then it will cancel
209 # the current task too early. gh-128550, gh-128588
210 self._tasks.add(task)
211 task.add_done_callback(self._on_task_done)
212 try:
213 return task
214 finally:
215 # gh-128552: prevent a refcycle of
216 # task.exception().__traceback__->TaskGroup.create_task->task
217 del task
219 # Since Python 3.8 Tasks propagate all exceptions correctly,
220 # except for KeyboardInterrupt and SystemExit which are
221 # still considered special.
223 def _is_base_error(self, exc: BaseException) -> bool:
224 assert isinstance(exc, BaseException)
225 return isinstance(exc, (SystemExit, KeyboardInterrupt))
227 def _abort(self):
228 self._aborting = True
230 for t in self._tasks:
231 if not t.done():
232 t.cancel()
234 def _on_task_done(self, task):
235 self._tasks.discard(task)
237 futures.future_discard_from_awaited_by(task, self._parent_task)
239 if self._on_completed_fut is not None and not self._tasks:
240 if not self._on_completed_fut.done(): 240 ↛ 243line 240 didn't jump to line 243 because the condition on line 240 was always true
241 self._on_completed_fut.set_result(True)
243 if task.cancelled():
244 return
246 exc = task.exception()
247 if exc is None:
248 return
250 self._errors.append(exc)
251 if self._is_base_error(exc) and self._base_error is None: 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true
252 self._base_error = exc
254 if self._parent_task.done(): 254 ↛ 257line 254 didn't jump to line 257 because the condition on line 254 was never true
255 # Not sure if this case is possible, but we want to handle
256 # it anyways.
257 self._loop.call_exception_handler({
258 'message': f'Task {task!r} has errored out but its parent '
259 f'task {self._parent_task} is already completed',
260 'exception': exc,
261 'task': task,
262 })
263 return
265 if not self._aborting and not self._parent_cancel_requested:
266 # If parent task *is not* being cancelled, it means that we want
267 # to manually cancel it to abort whatever is being run right now
268 # in the TaskGroup. But we want to mark parent task as
269 # "not cancelled" later in __aexit__. Example situation that
270 # we need to handle:
271 #
272 # async def foo():
273 # try:
274 # async with TaskGroup() as g:
275 # g.create_task(crash_soon())
276 # await something # <- this needs to be canceled
277 # # by the TaskGroup, e.g.
278 # # foo() needs to be cancelled
279 # except Exception:
280 # # Ignore any exceptions raised in the TaskGroup
281 # pass
282 # await something_else # this line has to be called
283 # # after TaskGroup is finished.
284 self._abort()
285 self._parent_cancel_requested = True
286 self._parent_task.cancel()
288 def cancel(self):
289 """Cancel the task group
291 `cancel()` will be called on any tasks in the group that aren't yet
292 done, as well as the parent (body) of the group. This will cause the
293 task group context manager to exit *without* `asyncio.CancelledError`
294 being raised.
296 If `cancel()` is called before entering the task group, the group will be
297 cancelled upon entry. This is useful for patterns where one piece of
298 code passes an unused TaskGroup instance to another in order to have
299 the ability to cancel anything run within the group.
301 `cancel()` is idempotent and may be called after the task group has
302 already exited.
303 """
304 if not self._entered:
305 self._cancel_on_enter = True
306 return
307 if self._exiting and not self._tasks:
308 return
309 if not self._aborting:
310 self._abort()
311 if self._parent_task and not self._parent_cancel_requested: 311 ↛ exitline 311 didn't return from function 'cancel' because the condition on line 311 was always true
312 self._parent_cancel_requested = True
313 self._parent_task.cancel()