Coverage for Lib/asyncio/graph.py: 98%
115 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:28 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:28 +0000
1"""Introspection utils for tasks call graphs."""
3import dataclasses
4import io
5import sys
6import types
8from . import events
9from . import futures
10from . import tasks
12__all__ = (
13 'capture_call_graph',
14 'format_call_graph',
15 'print_call_graph',
16 'FrameCallGraphEntry',
17 'FutureCallGraph',
18)
20# Sadly, we can't re-use the traceback module's datastructures as those
21# are tailored for error reporting, whereas we need to represent an
22# async call graph.
23#
24# Going with pretty verbose names as we'd like to export them to the
25# top level asyncio namespace, and want to avoid future name clashes.
28@dataclasses.dataclass(frozen=True, slots=True)
29class FrameCallGraphEntry:
30 frame: types.FrameType
33@dataclasses.dataclass(frozen=True, slots=True)
34class FutureCallGraph:
35 future: futures.Future
36 call_stack: tuple["FrameCallGraphEntry", ...]
37 awaited_by: tuple["FutureCallGraph", ...]
40def _build_graph_for_future(
41 future: futures.Future,
42 *,
43 limit: int | None = None,
44) -> FutureCallGraph:
45 if not isinstance(future, futures.Future):
46 raise TypeError(
47 f"{future!r} object does not appear to be compatible "
48 f"with asyncio.Future"
49 )
51 coro = None
52 if get_coro := getattr(future, 'get_coro', None):
53 coro = get_coro() if limit != 0 else None
55 st: list[FrameCallGraphEntry] = []
56 awaited_by: list[FutureCallGraph] = []
58 while coro is not None:
59 if hasattr(coro, 'cr_await'):
60 # A native coroutine or duck-type compatible iterator
61 st.append(FrameCallGraphEntry(coro.cr_frame))
62 coro = coro.cr_await
63 elif hasattr(coro, 'ag_await'): 63 ↛ 65line 63 didn't jump to line 65 because the condition on line 63 was never true
64 # A native async generator or duck-type compatible iterator
65 st.append(FrameCallGraphEntry(coro.cr_frame))
66 coro = coro.ag_await
67 else:
68 break
70 if future._asyncio_awaited_by:
71 for parent in future._asyncio_awaited_by:
72 awaited_by.append(_build_graph_for_future(parent, limit=limit))
74 if limit is not None:
75 if limit > 0:
76 st = st[:limit]
77 elif limit < 0: 77 ↛ 79line 77 didn't jump to line 79 because the condition on line 77 was always true
78 st = st[limit:]
79 st.reverse()
80 return FutureCallGraph(future, tuple(st), tuple(awaited_by))
83def capture_call_graph(
84 future: futures.Future | None = None,
85 /,
86 *,
87 depth: int = 1,
88 limit: int | None = None,
89) -> FutureCallGraph | None:
90 """Capture the async call graph for the current task or the provided Future.
92 The graph is represented with three data structures:
94 * FutureCallGraph(future, call_stack, awaited_by)
96 Where 'future' is an instance of asyncio.Future or asyncio.Task.
98 'call_stack' is a tuple of FrameGraphEntry objects.
100 'awaited_by' is a tuple of FutureCallGraph objects.
102 * FrameCallGraphEntry(frame)
104 Where 'frame' is a frame object of a regular Python function
105 in the call stack.
107 Receives an optional 'future' argument. If not passed,
108 the current task will be used. If there's no current task, the function
109 returns None.
111 If "capture_call_graph()" is introspecting *the current task*, the
112 optional keyword-only 'depth' argument can be used to skip the specified
113 number of frames from top of the stack.
115 If the optional keyword-only 'limit' argument is provided, each call
116 stack in the resulting graph is truncated to include at most
117 ``abs(limit)`` entries. If 'limit' is positive, the entries left are
118 the closest to the invocation point. If 'limit' is negative, the
119 topmost entries are left. If 'limit' is omitted or None, all entries
120 are present. If 'limit' is 0, the call stack is not captured at all,
121 only "awaited by" information is present.
122 """
124 loop = events._get_running_loop()
126 if future is not None:
127 # Check if we're in a context of a running event loop;
128 # if yes - check if the passed future is the currently
129 # running task or not.
130 if loop is None or future is not tasks.current_task(loop=loop):
131 return _build_graph_for_future(future, limit=limit)
132 # else: future is the current task, move on.
133 else:
134 if loop is None:
135 raise RuntimeError(
136 'capture_call_graph() is called outside of a running '
137 'event loop and no *future* to introspect was provided')
138 future = tasks.current_task(loop=loop)
140 if future is None:
141 # This isn't a generic call stack introspection utility. If we
142 # can't determine the current task and none was provided, we
143 # just return.
144 return None
146 if not isinstance(future, futures.Future):
147 raise TypeError(
148 f"{future!r} object does not appear to be compatible "
149 f"with asyncio.Future"
150 )
152 call_stack: list[FrameCallGraphEntry] = []
154 f = sys._getframe(depth) if limit != 0 else None
155 try:
156 while f is not None:
157 is_async = f.f_generator is not None
158 call_stack.append(FrameCallGraphEntry(f))
160 if is_async:
161 if f.f_back is not None and f.f_back.f_generator is None:
162 # We've reached the bottom of the coroutine stack, which
163 # must be the Task that runs it.
164 break
166 f = f.f_back
167 finally:
168 del f
170 awaited_by = []
171 if future._asyncio_awaited_by:
172 for parent in future._asyncio_awaited_by:
173 awaited_by.append(_build_graph_for_future(parent, limit=limit))
175 if limit is not None:
176 limit *= -1
177 if limit > 0:
178 call_stack = call_stack[:limit]
179 elif limit < 0:
180 call_stack = call_stack[limit:]
182 return FutureCallGraph(future, tuple(call_stack), tuple(awaited_by))
185def format_call_graph(
186 future: futures.Future | None = None,
187 /,
188 *,
189 depth: int = 1,
190 limit: int | None = None,
191) -> str:
192 """Return the async call graph as a string for `future`.
194 If `future` is not provided, format the call graph for the current task.
195 """
197 def render_level(st: FutureCallGraph, buf: list[str], level: int) -> None:
198 def add_line(line: str) -> None:
199 buf.append(level * ' ' + line)
201 if isinstance(st.future, tasks.Task):
202 add_line(
203 f'* Task(name={st.future.get_name()!r}, id={id(st.future):#x})'
204 )
205 else:
206 add_line(
207 f'* Future(id={id(st.future):#x})'
208 )
210 if st.call_stack:
211 add_line(
212 f' + Call stack:'
213 )
214 for ste in st.call_stack:
215 f = ste.frame
217 if f.f_generator is None:
218 f = ste.frame
219 add_line(
220 f' | File {f.f_code.co_filename!r},'
221 f' line {f.f_lineno}, in'
222 f' {f.f_code.co_qualname}()'
223 )
224 else:
225 c = f.f_generator
227 try:
228 f = c.cr_frame
229 code = c.cr_code
230 tag = 'async'
231 except AttributeError:
232 try:
233 f = c.ag_frame
234 code = c.ag_code
235 tag = 'async generator'
236 except AttributeError:
237 f = c.gi_frame
238 code = c.gi_code
239 tag = 'generator'
241 add_line(
242 f' | File {f.f_code.co_filename!r},'
243 f' line {f.f_lineno}, in'
244 f' {tag} {code.co_qualname}()'
245 )
247 if st.awaited_by:
248 add_line(
249 f' + Awaited by:'
250 )
251 for fut in st.awaited_by:
252 render_level(fut, buf, level + 1)
254 graph = capture_call_graph(future, depth=depth + 1, limit=limit)
255 if graph is None:
256 return ""
258 buf: list[str] = []
259 try:
260 render_level(graph, buf, 0)
261 finally:
262 # 'graph' has references to frames so we should
263 # make sure it's GC'ed as soon as we don't need it.
264 del graph
265 return '\n'.join(buf)
267def print_call_graph(
268 future: futures.Future | None = None,
269 /,
270 *,
271 file: io.Writer[str] | None = None,
272 depth: int = 1,
273 limit: int | None = None,
274) -> None:
275 """Print the async call graph for the current task or the provided Future."""
276 print(format_call_graph(future, depth=depth, limit=limit), file=file)