Coverage for Lib/asyncio/graph.py: 78%
115 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-23 01:21 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-23 01:21 +0000
1"""Introspection utils for tasks call graphs."""
3import dataclasses
4import io
5import sys
6import types
8from . import events
9from . import futures
10from . import tasks
12__all__ = (
13 'capture_call_graph',
14 'format_call_graph',
15 'print_call_graph',
16 'FrameCallGraphEntry',
17 'FutureCallGraph',
18)
20# Sadly, we can't re-use the traceback module's datastructures as those
21# are tailored for error reporting, whereas we need to represent an
22# async call graph.
23#
24# Going with pretty verbose names as we'd like to export them to the
25# top level asyncio namespace, and want to avoid future name clashes.
28@dataclasses.dataclass(frozen=True, slots=True)
29class FrameCallGraphEntry:
30 frame: types.FrameType
33@dataclasses.dataclass(frozen=True, slots=True)
34class FutureCallGraph:
35 future: futures.Future
36 call_stack: tuple["FrameCallGraphEntry", ...]
37 awaited_by: tuple["FutureCallGraph", ...]
40def _build_graph_for_future(
41 future: futures.Future,
42 *,
43 limit: int | None = None,
44) -> FutureCallGraph:
45 if not isinstance(future, futures.Future): 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true
46 raise TypeError(
47 f"{future!r} object does not appear to be compatible "
48 f"with asyncio.Future"
49 )
51 coro = None
52 if get_coro := getattr(future, 'get_coro', None):
53 coro = get_coro() if limit != 0 else None
55 st: list[FrameCallGraphEntry] = []
56 awaited_by: list[FutureCallGraph] = []
58 while coro is not None:
59 if hasattr(coro, 'cr_await'):
60 # A native coroutine or duck-type compatible iterator
61 st.append(FrameCallGraphEntry(coro.cr_frame))
62 coro = coro.cr_await
63 elif hasattr(coro, 'ag_await'): 63 ↛ 65line 63 didn't jump to line 65 because the condition on line 63 was never true
64 # A native async generator or duck-type compatible iterator
65 st.append(FrameCallGraphEntry(coro.cr_frame))
66 coro = coro.ag_await
67 else:
68 break
70 if future._asyncio_awaited_by:
71 for parent in future._asyncio_awaited_by:
72 awaited_by.append(_build_graph_for_future(parent, limit=limit))
74 if limit is not None: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true
75 if limit > 0:
76 st = st[:limit]
77 elif limit < 0:
78 st = st[limit:]
79 st.reverse()
80 return FutureCallGraph(future, tuple(st), tuple(awaited_by))
83def capture_call_graph(
84 future: futures.Future | None = None,
85 /,
86 *,
87 depth: int = 1,
88 limit: int | None = None,
89) -> FutureCallGraph | None:
90 """Capture the async call graph for the current task or the provided Future.
92 The graph is represented with three data structures:
94 * FutureCallGraph(future, call_stack, awaited_by)
96 Where 'future' is an instance of asyncio.Future or asyncio.Task.
98 'call_stack' is a tuple of FrameGraphEntry objects.
100 'awaited_by' is a tuple of FutureCallGraph objects.
102 * FrameCallGraphEntry(frame)
104 Where 'frame' is a frame object of a regular Python function
105 in the call stack.
107 Receives an optional 'future' argument. If not passed,
108 the current task will be used. If there's no current task, the function
109 returns None.
111 If "capture_call_graph()" is introspecting *the current task*, the
112 optional keyword-only 'depth' argument can be used to skip the specified
113 number of frames from top of the stack.
115 If the optional keyword-only 'limit' argument is provided, each call stack
116 in the resulting graph is truncated to include at most ``abs(limit)``
117 entries. If 'limit' is positive, the entries left are the closest to
118 the invocation point. If 'limit' is negative, the topmost entries are
119 left. If 'limit' is omitted or None, all entries are present.
120 If 'limit' is 0, the call stack is not captured at all, only
121 "awaited by" information is present.
122 """
124 loop = events._get_running_loop()
126 if future is not None:
127 # Check if we're in a context of a running event loop;
128 # if yes - check if the passed future is the currently
129 # running task or not.
130 if loop is None or future is not tasks.current_task(loop=loop): 130 ↛ 140line 130 didn't jump to line 140 because the condition on line 130 was always true
131 return _build_graph_for_future(future, limit=limit)
132 # else: future is the current task, move on.
133 else:
134 if loop is None: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 raise RuntimeError(
136 'capture_call_graph() is called outside of a running '
137 'event loop and no *future* to introspect was provided')
138 future = tasks.current_task(loop=loop)
140 if future is None: 140 ↛ 144line 140 didn't jump to line 144 because the condition on line 140 was never true
141 # This isn't a generic call stack introspection utility. If we
142 # can't determine the current task and none was provided, we
143 # just return.
144 return None
146 if not isinstance(future, futures.Future): 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 raise TypeError(
148 f"{future!r} object does not appear to be compatible "
149 f"with asyncio.Future"
150 )
152 call_stack: list[FrameCallGraphEntry] = []
154 f = sys._getframe(depth) if limit != 0 else None
155 try:
156 while f is not None: 156 ↛ 168line 156 didn't jump to line 168 because the condition on line 156 was always true
157 is_async = f.f_generator is not None
158 call_stack.append(FrameCallGraphEntry(f))
160 if is_async:
161 if f.f_back is not None and f.f_back.f_generator is None:
162 # We've reached the bottom of the coroutine stack, which
163 # must be the Task that runs it.
164 break
166 f = f.f_back
167 finally:
168 del f
170 awaited_by = []
171 if future._asyncio_awaited_by:
172 for parent in future._asyncio_awaited_by:
173 awaited_by.append(_build_graph_for_future(parent, limit=limit))
175 if limit is not None: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true
176 limit *= -1
177 if limit > 0:
178 call_stack = call_stack[:limit]
179 elif limit < 0:
180 call_stack = call_stack[limit:]
182 return FutureCallGraph(future, tuple(call_stack), tuple(awaited_by))
185def format_call_graph(
186 future: futures.Future | None = None,
187 /,
188 *,
189 depth: int = 1,
190 limit: int | None = None,
191) -> str:
192 """Return the async call graph as a string for `future`.
194 If `future` is not provided, format the call graph for the current task.
195 """
197 def render_level(st: FutureCallGraph, buf: list[str], level: int) -> None:
198 def add_line(line: str) -> None:
199 buf.append(level * ' ' + line)
201 if isinstance(st.future, tasks.Task):
202 add_line(
203 f'* Task(name={st.future.get_name()!r}, id={id(st.future):#x})'
204 )
205 else:
206 add_line(
207 f'* Future(id={id(st.future):#x})'
208 )
210 if st.call_stack:
211 add_line(
212 f' + Call stack:'
213 )
214 for ste in st.call_stack:
215 f = ste.frame
217 if f.f_generator is None:
218 f = ste.frame
219 add_line(
220 f' | File {f.f_code.co_filename!r},'
221 f' line {f.f_lineno}, in'
222 f' {f.f_code.co_qualname}()'
223 )
224 else:
225 c = f.f_generator
227 try:
228 f = c.cr_frame
229 code = c.cr_code
230 tag = 'async'
231 except AttributeError:
232 try:
233 f = c.ag_frame
234 code = c.ag_code
235 tag = 'async generator'
236 except AttributeError:
237 f = c.gi_frame
238 code = c.gi_code
239 tag = 'generator'
241 add_line(
242 f' | File {f.f_code.co_filename!r},'
243 f' line {f.f_lineno}, in'
244 f' {tag} {code.co_qualname}()'
245 )
247 if st.awaited_by:
248 add_line(
249 f' + Awaited by:'
250 )
251 for fut in st.awaited_by:
252 render_level(fut, buf, level + 1)
254 graph = capture_call_graph(future, depth=depth + 1, limit=limit)
255 if graph is None: 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true
256 return ""
258 buf: list[str] = []
259 try:
260 render_level(graph, buf, 0)
261 finally:
262 # 'graph' has references to frames so we should
263 # make sure it's GC'ed as soon as we don't need it.
264 del graph
265 return '\n'.join(buf)
267def print_call_graph(
268 future: futures.Future | None = None,
269 /,
270 *,
271 file: io.Writer[str] | None = None,
272 depth: int = 1,
273 limit: int | None = None,
274) -> None:
275 """Print the async call graph for the current task or the provided Future."""
276 print(format_call_graph(future, depth=depth, limit=limit), file=file)