Coverage for Lib/asyncio/graph.py: 78%
115 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
1"""Introspection utils for tasks call graphs."""
3import dataclasses
4import sys
5import types
7from . import events
8from . import futures
9from . import tasks
11__all__ = (
12 'capture_call_graph',
13 'format_call_graph',
14 'print_call_graph',
15 'FrameCallGraphEntry',
16 'FutureCallGraph',
17)
19if False: # for type checkers
20 from typing import TextIO
22# Sadly, we can't re-use the traceback module's datastructures as those
23# are tailored for error reporting, whereas we need to represent an
24# async call graph.
25#
26# Going with pretty verbose names as we'd like to export them to the
27# top level asyncio namespace, and want to avoid future name clashes.
30@dataclasses.dataclass(frozen=True, slots=True)
31class FrameCallGraphEntry:
32 frame: types.FrameType
35@dataclasses.dataclass(frozen=True, slots=True)
36class FutureCallGraph:
37 future: futures.Future
38 call_stack: tuple["FrameCallGraphEntry", ...]
39 awaited_by: tuple["FutureCallGraph", ...]
42def _build_graph_for_future(
43 future: futures.Future,
44 *,
45 limit: int | None = None,
46) -> FutureCallGraph:
47 if not isinstance(future, futures.Future): 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true
48 raise TypeError(
49 f"{future!r} object does not appear to be compatible "
50 f"with asyncio.Future"
51 )
53 coro = None
54 if get_coro := getattr(future, 'get_coro', None):
55 coro = get_coro() if limit != 0 else None
57 st: list[FrameCallGraphEntry] = []
58 awaited_by: list[FutureCallGraph] = []
60 while coro is not None:
61 if hasattr(coro, 'cr_await'):
62 # A native coroutine or duck-type compatible iterator
63 st.append(FrameCallGraphEntry(coro.cr_frame))
64 coro = coro.cr_await
65 elif hasattr(coro, 'ag_await'): 65 ↛ 67line 65 didn't jump to line 67 because the condition on line 65 was never true
66 # A native async generator or duck-type compatible iterator
67 st.append(FrameCallGraphEntry(coro.cr_frame))
68 coro = coro.ag_await
69 else:
70 break
72 if future._asyncio_awaited_by:
73 for parent in future._asyncio_awaited_by:
74 awaited_by.append(_build_graph_for_future(parent, limit=limit))
76 if limit is not None: 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true
77 if limit > 0:
78 st = st[:limit]
79 elif limit < 0:
80 st = st[limit:]
81 st.reverse()
82 return FutureCallGraph(future, tuple(st), tuple(awaited_by))
85def capture_call_graph(
86 future: futures.Future | None = None,
87 /,
88 *,
89 depth: int = 1,
90 limit: int | None = None,
91) -> FutureCallGraph | None:
92 """Capture the async call graph for the current task or the provided Future.
94 The graph is represented with three data structures:
96 * FutureCallGraph(future, call_stack, awaited_by)
98 Where 'future' is an instance of asyncio.Future or asyncio.Task.
100 'call_stack' is a tuple of FrameGraphEntry objects.
102 'awaited_by' is a tuple of FutureCallGraph objects.
104 * FrameCallGraphEntry(frame)
106 Where 'frame' is a frame object of a regular Python function
107 in the call stack.
109 Receives an optional 'future' argument. If not passed,
110 the current task will be used. If there's no current task, the function
111 returns None.
113 If "capture_call_graph()" is introspecting *the current task*, the
114 optional keyword-only 'depth' argument can be used to skip the specified
115 number of frames from top of the stack.
117 If the optional keyword-only 'limit' argument is provided, each call stack
118 in the resulting graph is truncated to include at most ``abs(limit)``
119 entries. If 'limit' is positive, the entries left are the closest to
120 the invocation point. If 'limit' is negative, the topmost entries are
121 left. If 'limit' is omitted or None, all entries are present.
122 If 'limit' is 0, the call stack is not captured at all, only
123 "awaited by" information is present.
124 """
126 loop = events._get_running_loop()
128 if future is not None:
129 # Check if we're in a context of a running event loop;
130 # if yes - check if the passed future is the currently
131 # running task or not.
132 if loop is None or future is not tasks.current_task(loop=loop): 132 ↛ 142line 132 didn't jump to line 142 because the condition on line 132 was always true
133 return _build_graph_for_future(future, limit=limit)
134 # else: future is the current task, move on.
135 else:
136 if loop is None: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true
137 raise RuntimeError(
138 'capture_call_graph() is called outside of a running '
139 'event loop and no *future* to introspect was provided')
140 future = tasks.current_task(loop=loop)
142 if future is None: 142 ↛ 146line 142 didn't jump to line 146 because the condition on line 142 was never true
143 # This isn't a generic call stack introspection utility. If we
144 # can't determine the current task and none was provided, we
145 # just return.
146 return None
148 if not isinstance(future, futures.Future): 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true
149 raise TypeError(
150 f"{future!r} object does not appear to be compatible "
151 f"with asyncio.Future"
152 )
154 call_stack: list[FrameCallGraphEntry] = []
156 f = sys._getframe(depth) if limit != 0 else None
157 try:
158 while f is not None: 158 ↛ 170line 158 didn't jump to line 170 because the condition on line 158 was always true
159 is_async = f.f_generator is not None
160 call_stack.append(FrameCallGraphEntry(f))
162 if is_async:
163 if f.f_back is not None and f.f_back.f_generator is None:
164 # We've reached the bottom of the coroutine stack, which
165 # must be the Task that runs it.
166 break
168 f = f.f_back
169 finally:
170 del f
172 awaited_by = []
173 if future._asyncio_awaited_by:
174 for parent in future._asyncio_awaited_by:
175 awaited_by.append(_build_graph_for_future(parent, limit=limit))
177 if limit is not None: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true
178 limit *= -1
179 if limit > 0:
180 call_stack = call_stack[:limit]
181 elif limit < 0:
182 call_stack = call_stack[limit:]
184 return FutureCallGraph(future, tuple(call_stack), tuple(awaited_by))
187def format_call_graph(
188 future: futures.Future | None = None,
189 /,
190 *,
191 depth: int = 1,
192 limit: int | None = None,
193) -> str:
194 """Return the async call graph as a string for `future`.
196 If `future` is not provided, format the call graph for the current task.
197 """
199 def render_level(st: FutureCallGraph, buf: list[str], level: int) -> None:
200 def add_line(line: str) -> None:
201 buf.append(level * ' ' + line)
203 if isinstance(st.future, tasks.Task):
204 add_line(
205 f'* Task(name={st.future.get_name()!r}, id={id(st.future):#x})'
206 )
207 else:
208 add_line(
209 f'* Future(id={id(st.future):#x})'
210 )
212 if st.call_stack:
213 add_line(
214 f' + Call stack:'
215 )
216 for ste in st.call_stack:
217 f = ste.frame
219 if f.f_generator is None:
220 f = ste.frame
221 add_line(
222 f' | File {f.f_code.co_filename!r},'
223 f' line {f.f_lineno}, in'
224 f' {f.f_code.co_qualname}()'
225 )
226 else:
227 c = f.f_generator
229 try:
230 f = c.cr_frame
231 code = c.cr_code
232 tag = 'async'
233 except AttributeError:
234 try:
235 f = c.ag_frame
236 code = c.ag_code
237 tag = 'async generator'
238 except AttributeError:
239 f = c.gi_frame
240 code = c.gi_code
241 tag = 'generator'
243 add_line(
244 f' | File {f.f_code.co_filename!r},'
245 f' line {f.f_lineno}, in'
246 f' {tag} {code.co_qualname}()'
247 )
249 if st.awaited_by:
250 add_line(
251 f' + Awaited by:'
252 )
253 for fut in st.awaited_by:
254 render_level(fut, buf, level + 1)
256 graph = capture_call_graph(future, depth=depth + 1, limit=limit)
257 if graph is None: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true
258 return ""
260 buf: list[str] = []
261 try:
262 render_level(graph, buf, 0)
263 finally:
264 # 'graph' has references to frames so we should
265 # make sure it's GC'ed as soon as we don't need it.
266 del graph
267 return '\n'.join(buf)
269def print_call_graph(
270 future: futures.Future | None = None,
271 /,
272 *,
273 file: TextIO | None = None,
274 depth: int = 1,
275 limit: int | None = None,
276) -> None:
277 """Print the async call graph for the current task or the provided Future."""
278 print(format_call_graph(future, depth=depth, limit=limit), file=file)