Coverage for Lib/asyncio/taskgroups.py: 96%
138 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
1# Adapted with permission from the EdgeDB project;
2# license: PSFL.
5__all__ = ("TaskGroup",)
7from . import events
8from . import exceptions
9from . import futures
10from . import tasks
13class TaskGroup:
14 """Asynchronous context manager for managing groups of tasks.
16 Example use:
18 async with asyncio.TaskGroup() as group:
19 task1 = group.create_task(some_coroutine(...))
20 task2 = group.create_task(other_coroutine(...))
21 print("Both tasks have completed now.")
23 All tasks are awaited when the context manager exits.
25 Any exceptions other than `asyncio.CancelledError` raised within
26 a task will cancel all remaining tasks and wait for them to exit.
27 The exceptions are then combined and raised as an `ExceptionGroup`.
28 """
29 def __init__(self):
30 self._entered = False
31 self._exiting = False
32 self._aborting = False
33 self._loop = None
34 self._parent_task = None
35 self._parent_cancel_requested = False
36 self._tasks = set()
37 self._errors = []
38 self._base_error = None
39 self._on_completed_fut = None
41 def __repr__(self):
42 info = ['']
43 if self._tasks:
44 info.append(f'tasks={len(self._tasks)}')
45 if self._errors:
46 info.append(f'errors={len(self._errors)}')
47 if self._aborting:
48 info.append('cancelling')
49 elif self._entered:
50 info.append('entered')
52 info_str = ' '.join(info)
53 return f'<TaskGroup{info_str}>'
55 async def __aenter__(self):
56 if self._entered:
57 raise RuntimeError(
58 f"TaskGroup {self!r} has already been entered")
59 if self._loop is None: 59 ↛ 61line 59 didn't jump to line 61 because the condition on line 59 was always true
60 self._loop = events.get_running_loop()
61 self._parent_task = tasks.current_task(self._loop)
62 if self._parent_task is None:
63 raise RuntimeError(
64 f'TaskGroup {self!r} cannot determine the parent task')
65 self._entered = True
67 return self
69 async def __aexit__(self, et, exc, tb):
70 tb = None
71 try:
72 return await self._aexit(et, exc)
73 finally:
74 # Exceptions are heavy objects that can have object
75 # cycles (bad for GC); let's not keep a reference to
76 # a bunch of them. It would be nicer to use a try/finally
77 # in __aexit__ directly but that introduced some diff noise
78 self._parent_task = None
79 self._errors = None
80 self._base_error = None
81 exc = None
83 async def _aexit(self, et, exc):
84 self._exiting = True
86 if (exc is not None and
87 self._is_base_error(exc) and
88 self._base_error is None):
89 self._base_error = exc
91 if et is not None and issubclass(et, exceptions.CancelledError):
92 propagate_cancellation_error = exc
93 else:
94 propagate_cancellation_error = None
96 if et is not None:
97 if not self._aborting:
98 # Our parent task is being cancelled:
99 #
100 # async with TaskGroup() as g:
101 # g.create_task(...)
102 # await ... # <- CancelledError
103 #
104 # or there's an exception in "async with":
105 #
106 # async with TaskGroup() as g:
107 # g.create_task(...)
108 # 1 / 0
109 #
110 self._abort()
112 # We use while-loop here because "self._on_completed_fut"
113 # can be cancelled multiple times if our parent task
114 # is being cancelled repeatedly (or even once, when
115 # our own cancellation is already in progress)
116 while self._tasks:
117 if self._on_completed_fut is None: 117 ↛ 120line 117 didn't jump to line 120 because the condition on line 117 was always true
118 self._on_completed_fut = self._loop.create_future()
120 try:
121 await self._on_completed_fut
122 except exceptions.CancelledError as ex:
123 if not self._aborting:
124 # Our parent task is being cancelled:
125 #
126 # async def wrapper():
127 # async with TaskGroup() as g:
128 # g.create_task(foo)
129 #
130 # "wrapper" is being cancelled while "foo" is
131 # still running.
132 propagate_cancellation_error = ex
133 self._abort()
135 self._on_completed_fut = None
137 assert not self._tasks
139 if self._base_error is not None:
140 try:
141 raise self._base_error
142 finally:
143 exc = None
145 if self._parent_cancel_requested:
146 # If this flag is set we *must* call uncancel().
147 if self._parent_task.uncancel() == 0:
148 # If there are no pending cancellations left,
149 # don't propagate CancelledError.
150 propagate_cancellation_error = None
152 # Propagate CancelledError if there is one, except if there
153 # are other errors -- those have priority.
154 try:
155 if propagate_cancellation_error is not None and not self._errors:
156 try:
157 raise propagate_cancellation_error
158 finally:
159 exc = None
160 finally:
161 propagate_cancellation_error = None
163 if et is not None and not issubclass(et, exceptions.CancelledError):
164 self._errors.append(exc)
166 if self._errors:
167 # If the parent task is being cancelled from the outside
168 # of the taskgroup, un-cancel and re-cancel the parent task,
169 # which will keep the cancel count stable.
170 if self._parent_task.cancelling():
171 self._parent_task.uncancel()
172 self._parent_task.cancel()
173 try:
174 raise BaseExceptionGroup(
175 'unhandled errors in a TaskGroup',
176 self._errors,
177 ) from None
178 finally:
179 exc = None
182 def create_task(self, coro, *, name=None, context=None):
183 """Create a new task in this group and return it.
185 Similar to `asyncio.create_task`.
186 """
187 if not self._entered:
188 coro.close()
189 raise RuntimeError(f"TaskGroup {self!r} has not been entered")
190 if self._exiting and not self._tasks:
191 coro.close()
192 raise RuntimeError(f"TaskGroup {self!r} is finished")
193 if self._aborting:
194 coro.close()
195 raise RuntimeError(f"TaskGroup {self!r} is shutting down")
196 if context is None:
197 task = self._loop.create_task(coro, name=name)
198 else:
199 task = self._loop.create_task(coro, name=name, context=context)
201 futures.future_add_to_awaited_by(task, self._parent_task)
203 # Always schedule the done callback even if the task is
204 # already done (e.g. if the coro was able to complete eagerly),
205 # otherwise if the task completes with an exception then it will cancel
206 # the current task too early. gh-128550, gh-128588
207 self._tasks.add(task)
208 task.add_done_callback(self._on_task_done)
209 try:
210 return task
211 finally:
212 # gh-128552: prevent a refcycle of
213 # task.exception().__traceback__->TaskGroup.create_task->task
214 del task
216 # Since Python 3.8 Tasks propagate all exceptions correctly,
217 # except for KeyboardInterrupt and SystemExit which are
218 # still considered special.
220 def _is_base_error(self, exc: BaseException) -> bool:
221 assert isinstance(exc, BaseException)
222 return isinstance(exc, (SystemExit, KeyboardInterrupt))
224 def _abort(self):
225 self._aborting = True
227 for t in self._tasks:
228 if not t.done():
229 t.cancel()
231 def _on_task_done(self, task):
232 self._tasks.discard(task)
234 futures.future_discard_from_awaited_by(task, self._parent_task)
236 if self._on_completed_fut is not None and not self._tasks:
237 if not self._on_completed_fut.done(): 237 ↛ 240line 237 didn't jump to line 240 because the condition on line 237 was always true
238 self._on_completed_fut.set_result(True)
240 if task.cancelled():
241 return
243 exc = task.exception()
244 if exc is None:
245 return
247 self._errors.append(exc)
248 if self._is_base_error(exc) and self._base_error is None: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true
249 self._base_error = exc
251 if self._parent_task.done(): 251 ↛ 254line 251 didn't jump to line 254 because the condition on line 251 was never true
252 # Not sure if this case is possible, but we want to handle
253 # it anyways.
254 self._loop.call_exception_handler({
255 'message': f'Task {task!r} has errored out but its parent '
256 f'task {self._parent_task} is already completed',
257 'exception': exc,
258 'task': task,
259 })
260 return
262 if not self._aborting and not self._parent_cancel_requested:
263 # If parent task *is not* being cancelled, it means that we want
264 # to manually cancel it to abort whatever is being run right now
265 # in the TaskGroup. But we want to mark parent task as
266 # "not cancelled" later in __aexit__. Example situation that
267 # we need to handle:
268 #
269 # async def foo():
270 # try:
271 # async with TaskGroup() as g:
272 # g.create_task(crash_soon())
273 # await something # <- this needs to be canceled
274 # # by the TaskGroup, e.g.
275 # # foo() needs to be cancelled
276 # except Exception:
277 # # Ignore any exceptions raised in the TaskGroup
278 # pass
279 # await something_else # this line has to be called
280 # # after TaskGroup is finished.
281 self._abort()
282 self._parent_cancel_requested = True
283 self._parent_task.cancel()