Coverage for Lib/asyncio/taskgroups.py: 96%
136 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-23 01:21 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-23 01:21 +0000
1# Adapted with permission from the EdgeDB project;
2# license: PSFL.
5__all__ = ("TaskGroup",)
7from . import events
8from . import exceptions
9from . import futures
10from . import tasks
13class TaskGroup:
14 """Asynchronous context manager for managing groups of tasks.
16 Example use:
18 async with asyncio.TaskGroup() as group:
19 task1 = group.create_task(some_coroutine(...))
20 task2 = group.create_task(other_coroutine(...))
21 print("Both tasks have completed now.")
23 All tasks are awaited when the context manager exits.
25 Any exceptions other than `asyncio.CancelledError` raised within
26 a task will cancel all remaining tasks and wait for them to exit.
27 The exceptions are then combined and raised as an `ExceptionGroup`.
28 """
29 def __init__(self):
30 self._entered = False
31 self._exiting = False
32 self._aborting = False
33 self._loop = None
34 self._parent_task = None
35 self._parent_cancel_requested = False
36 self._tasks = set()
37 self._errors = []
38 self._base_error = None
39 self._on_completed_fut = None
41 def __repr__(self):
42 info = ['']
43 if self._tasks:
44 info.append(f'tasks={len(self._tasks)}')
45 if self._errors:
46 info.append(f'errors={len(self._errors)}')
47 if self._aborting:
48 info.append('cancelling')
49 elif self._entered:
50 info.append('entered')
52 info_str = ' '.join(info)
53 return f'<TaskGroup{info_str}>'
55 async def __aenter__(self):
56 if self._entered:
57 raise RuntimeError(
58 f"TaskGroup {self!r} has already been entered")
59 if self._loop is None: 59 ↛ 61line 59 didn't jump to line 61 because the condition on line 59 was always true
60 self._loop = events.get_running_loop()
61 self._parent_task = tasks.current_task(self._loop)
62 if self._parent_task is None:
63 raise RuntimeError(
64 f'TaskGroup {self!r} cannot determine the parent task')
65 self._entered = True
67 return self
69 async def __aexit__(self, et, exc, tb):
70 tb = None
71 try:
72 return await self._aexit(et, exc)
73 finally:
74 # Exceptions are heavy objects that can have object
75 # cycles (bad for GC); let's not keep a reference to
76 # a bunch of them. It would be nicer to use a try/finally
77 # in __aexit__ directly but that introduced some diff noise
78 self._parent_task = None
79 self._errors = None
80 self._base_error = None
81 exc = None
83 async def _aexit(self, et, exc):
84 self._exiting = True
86 if (exc is not None and
87 self._is_base_error(exc) and
88 self._base_error is None):
89 self._base_error = exc
91 if et is not None and issubclass(et, exceptions.CancelledError):
92 propagate_cancellation_error = exc
93 else:
94 propagate_cancellation_error = None
96 if et is not None:
97 if not self._aborting:
98 # Our parent task is being cancelled:
99 #
100 # async with TaskGroup() as g:
101 # g.create_task(...)
102 # await ... # <- CancelledError
103 #
104 # or there's an exception in "async with":
105 #
106 # async with TaskGroup() as g:
107 # g.create_task(...)
108 # 1 / 0
109 #
110 self._abort()
112 # We use while-loop here because "self._on_completed_fut"
113 # can be cancelled multiple times if our parent task
114 # is being cancelled repeatedly (or even once, when
115 # our own cancellation is already in progress)
116 while self._tasks:
117 if self._on_completed_fut is None: 117 ↛ 120line 117 didn't jump to line 120 because the condition on line 117 was always true
118 self._on_completed_fut = self._loop.create_future()
120 try:
121 await self._on_completed_fut
122 except exceptions.CancelledError as ex:
123 if not self._aborting:
124 # Our parent task is being cancelled:
125 #
126 # async def wrapper():
127 # async with TaskGroup() as g:
128 # g.create_task(foo)
129 #
130 # "wrapper" is being cancelled while "foo" is
131 # still running.
132 propagate_cancellation_error = ex
133 self._abort()
135 self._on_completed_fut = None
137 assert not self._tasks
139 if self._base_error is not None:
140 try:
141 raise self._base_error
142 finally:
143 exc = None
145 if self._parent_cancel_requested:
146 # If this flag is set we *must* call uncancel().
147 if self._parent_task.uncancel() == 0:
148 # If there are no pending cancellations left,
149 # don't propagate CancelledError.
150 propagate_cancellation_error = None
152 # Propagate CancelledError if there is one, except if there
153 # are other errors -- those have priority.
154 try:
155 if propagate_cancellation_error is not None and not self._errors:
156 try:
157 raise propagate_cancellation_error
158 finally:
159 exc = None
160 finally:
161 propagate_cancellation_error = None
163 if et is not None and not issubclass(et, exceptions.CancelledError):
164 self._errors.append(exc)
166 if self._errors:
167 # If the parent task is being cancelled from the outside
168 # of the taskgroup, un-cancel and re-cancel the parent task,
169 # which will keep the cancel count stable.
170 if self._parent_task.cancelling():
171 self._parent_task.uncancel()
172 self._parent_task.cancel()
173 try:
174 raise BaseExceptionGroup(
175 'unhandled errors in a TaskGroup',
176 self._errors,
177 ) from None
178 finally:
179 exc = None
182 def create_task(self, coro, **kwargs):
183 """Create a new task in this group and return it.
185 Similar to `asyncio.create_task`.
186 """
187 if not self._entered:
188 coro.close()
189 raise RuntimeError(f"TaskGroup {self!r} has not been entered")
190 if self._exiting and not self._tasks:
191 coro.close()
192 raise RuntimeError(f"TaskGroup {self!r} is finished")
193 if self._aborting:
194 coro.close()
195 raise RuntimeError(f"TaskGroup {self!r} is shutting down")
196 task = self._loop.create_task(coro, **kwargs)
198 futures.future_add_to_awaited_by(task, self._parent_task)
200 # Always schedule the done callback even if the task is
201 # already done (e.g. if the coro was able to complete eagerly),
202 # otherwise if the task completes with an exception then it will cancel
203 # the current task too early. gh-128550, gh-128588
204 self._tasks.add(task)
205 task.add_done_callback(self._on_task_done)
206 try:
207 return task
208 finally:
209 # gh-128552: prevent a refcycle of
210 # task.exception().__traceback__->TaskGroup.create_task->task
211 del task
213 # Since Python 3.8 Tasks propagate all exceptions correctly,
214 # except for KeyboardInterrupt and SystemExit which are
215 # still considered special.
217 def _is_base_error(self, exc: BaseException) -> bool:
218 assert isinstance(exc, BaseException)
219 return isinstance(exc, (SystemExit, KeyboardInterrupt))
221 def _abort(self):
222 self._aborting = True
224 for t in self._tasks:
225 if not t.done():
226 t.cancel()
228 def _on_task_done(self, task):
229 self._tasks.discard(task)
231 futures.future_discard_from_awaited_by(task, self._parent_task)
233 if self._on_completed_fut is not None and not self._tasks:
234 if not self._on_completed_fut.done(): 234 ↛ 237line 234 didn't jump to line 237 because the condition on line 234 was always true
235 self._on_completed_fut.set_result(True)
237 if task.cancelled():
238 return
240 exc = task.exception()
241 if exc is None:
242 return
244 self._errors.append(exc)
245 if self._is_base_error(exc) and self._base_error is None: 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true
246 self._base_error = exc
248 if self._parent_task.done(): 248 ↛ 251line 248 didn't jump to line 251 because the condition on line 248 was never true
249 # Not sure if this case is possible, but we want to handle
250 # it anyways.
251 self._loop.call_exception_handler({
252 'message': f'Task {task!r} has errored out but its parent '
253 f'task {self._parent_task} is already completed',
254 'exception': exc,
255 'task': task,
256 })
257 return
259 if not self._aborting and not self._parent_cancel_requested:
260 # If parent task *is not* being cancelled, it means that we want
261 # to manually cancel it to abort whatever is being run right now
262 # in the TaskGroup. But we want to mark parent task as
263 # "not cancelled" later in __aexit__. Example situation that
264 # we need to handle:
265 #
266 # async def foo():
267 # try:
268 # async with TaskGroup() as g:
269 # g.create_task(crash_soon())
270 # await something # <- this needs to be canceled
271 # # by the TaskGroup, e.g.
272 # # foo() needs to be cancelled
273 # except Exception:
274 # # Ignore any exceptions raised in the TaskGroup
275 # pass
276 # await something_else # this line has to be called
277 # # after TaskGroup is finished.
278 self._abort()
279 self._parent_cancel_requested = True
280 self._parent_task.cancel()