Coverage for Lib/asyncio/graph.py: 78%

115 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-23 01:21 +0000

1"""Introspection utils for tasks call graphs.""" 

2 

3import dataclasses 

4import io 

5import sys 

6import types 

7 

8from . import events 

9from . import futures 

10from . import tasks 

11 

12__all__ = ( 

13 'capture_call_graph', 

14 'format_call_graph', 

15 'print_call_graph', 

16 'FrameCallGraphEntry', 

17 'FutureCallGraph', 

18) 

19 

20# Sadly, we can't re-use the traceback module's datastructures as those 

21# are tailored for error reporting, whereas we need to represent an 

22# async call graph. 

23# 

24# Going with pretty verbose names as we'd like to export them to the 

25# top level asyncio namespace, and want to avoid future name clashes. 

26 

27 

28@dataclasses.dataclass(frozen=True, slots=True) 

29class FrameCallGraphEntry: 

30 frame: types.FrameType 

31 

32 

33@dataclasses.dataclass(frozen=True, slots=True) 

34class FutureCallGraph: 

35 future: futures.Future 

36 call_stack: tuple["FrameCallGraphEntry", ...] 

37 awaited_by: tuple["FutureCallGraph", ...] 

38 

39 

40def _build_graph_for_future( 

41 future: futures.Future, 

42 *, 

43 limit: int | None = None, 

44) -> FutureCallGraph: 

45 if not isinstance(future, futures.Future): 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true

46 raise TypeError( 

47 f"{future!r} object does not appear to be compatible " 

48 f"with asyncio.Future" 

49 ) 

50 

51 coro = None 

52 if get_coro := getattr(future, 'get_coro', None): 

53 coro = get_coro() if limit != 0 else None 

54 

55 st: list[FrameCallGraphEntry] = [] 

56 awaited_by: list[FutureCallGraph] = [] 

57 

58 while coro is not None: 

59 if hasattr(coro, 'cr_await'): 

60 # A native coroutine or duck-type compatible iterator 

61 st.append(FrameCallGraphEntry(coro.cr_frame)) 

62 coro = coro.cr_await 

63 elif hasattr(coro, 'ag_await'): 63 ↛ 65line 63 didn't jump to line 65 because the condition on line 63 was never true

64 # A native async generator or duck-type compatible iterator 

65 st.append(FrameCallGraphEntry(coro.cr_frame)) 

66 coro = coro.ag_await 

67 else: 

68 break 

69 

70 if future._asyncio_awaited_by: 

71 for parent in future._asyncio_awaited_by: 

72 awaited_by.append(_build_graph_for_future(parent, limit=limit)) 

73 

74 if limit is not None: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true

75 if limit > 0: 

76 st = st[:limit] 

77 elif limit < 0: 

78 st = st[limit:] 

79 st.reverse() 

80 return FutureCallGraph(future, tuple(st), tuple(awaited_by)) 

81 

82 

83def capture_call_graph( 

84 future: futures.Future | None = None, 

85 /, 

86 *, 

87 depth: int = 1, 

88 limit: int | None = None, 

89) -> FutureCallGraph | None: 

90 """Capture the async call graph for the current task or the provided Future. 

91 

92 The graph is represented with three data structures: 

93 

94 * FutureCallGraph(future, call_stack, awaited_by) 

95 

96 Where 'future' is an instance of asyncio.Future or asyncio.Task. 

97 

98 'call_stack' is a tuple of FrameGraphEntry objects. 

99 

100 'awaited_by' is a tuple of FutureCallGraph objects. 

101 

102 * FrameCallGraphEntry(frame) 

103 

104 Where 'frame' is a frame object of a regular Python function 

105 in the call stack. 

106 

107 Receives an optional 'future' argument. If not passed, 

108 the current task will be used. If there's no current task, the function 

109 returns None. 

110 

111 If "capture_call_graph()" is introspecting *the current task*, the 

112 optional keyword-only 'depth' argument can be used to skip the specified 

113 number of frames from top of the stack. 

114 

115 If the optional keyword-only 'limit' argument is provided, each call stack 

116 in the resulting graph is truncated to include at most ``abs(limit)`` 

117 entries. If 'limit' is positive, the entries left are the closest to 

118 the invocation point. If 'limit' is negative, the topmost entries are 

119 left. If 'limit' is omitted or None, all entries are present. 

120 If 'limit' is 0, the call stack is not captured at all, only 

121 "awaited by" information is present. 

122 """ 

123 

124 loop = events._get_running_loop() 

125 

126 if future is not None: 

127 # Check if we're in a context of a running event loop; 

128 # if yes - check if the passed future is the currently 

129 # running task or not. 

130 if loop is None or future is not tasks.current_task(loop=loop): 130 ↛ 140line 130 didn't jump to line 140 because the condition on line 130 was always true

131 return _build_graph_for_future(future, limit=limit) 

132 # else: future is the current task, move on. 

133 else: 

134 if loop is None: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 raise RuntimeError( 

136 'capture_call_graph() is called outside of a running ' 

137 'event loop and no *future* to introspect was provided') 

138 future = tasks.current_task(loop=loop) 

139 

140 if future is None: 140 ↛ 144line 140 didn't jump to line 144 because the condition on line 140 was never true

141 # This isn't a generic call stack introspection utility. If we 

142 # can't determine the current task and none was provided, we 

143 # just return. 

144 return None 

145 

146 if not isinstance(future, futures.Future): 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true

147 raise TypeError( 

148 f"{future!r} object does not appear to be compatible " 

149 f"with asyncio.Future" 

150 ) 

151 

152 call_stack: list[FrameCallGraphEntry] = [] 

153 

154 f = sys._getframe(depth) if limit != 0 else None 

155 try: 

156 while f is not None: 156 ↛ 168line 156 didn't jump to line 168 because the condition on line 156 was always true

157 is_async = f.f_generator is not None 

158 call_stack.append(FrameCallGraphEntry(f)) 

159 

160 if is_async: 

161 if f.f_back is not None and f.f_back.f_generator is None: 

162 # We've reached the bottom of the coroutine stack, which 

163 # must be the Task that runs it. 

164 break 

165 

166 f = f.f_back 

167 finally: 

168 del f 

169 

170 awaited_by = [] 

171 if future._asyncio_awaited_by: 

172 for parent in future._asyncio_awaited_by: 

173 awaited_by.append(_build_graph_for_future(parent, limit=limit)) 

174 

175 if limit is not None: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true

176 limit *= -1 

177 if limit > 0: 

178 call_stack = call_stack[:limit] 

179 elif limit < 0: 

180 call_stack = call_stack[limit:] 

181 

182 return FutureCallGraph(future, tuple(call_stack), tuple(awaited_by)) 

183 

184 

185def format_call_graph( 

186 future: futures.Future | None = None, 

187 /, 

188 *, 

189 depth: int = 1, 

190 limit: int | None = None, 

191) -> str: 

192 """Return the async call graph as a string for `future`. 

193 

194 If `future` is not provided, format the call graph for the current task. 

195 """ 

196 

197 def render_level(st: FutureCallGraph, buf: list[str], level: int) -> None: 

198 def add_line(line: str) -> None: 

199 buf.append(level * ' ' + line) 

200 

201 if isinstance(st.future, tasks.Task): 

202 add_line( 

203 f'* Task(name={st.future.get_name()!r}, id={id(st.future):#x})' 

204 ) 

205 else: 

206 add_line( 

207 f'* Future(id={id(st.future):#x})' 

208 ) 

209 

210 if st.call_stack: 

211 add_line( 

212 f' + Call stack:' 

213 ) 

214 for ste in st.call_stack: 

215 f = ste.frame 

216 

217 if f.f_generator is None: 

218 f = ste.frame 

219 add_line( 

220 f' | File {f.f_code.co_filename!r},' 

221 f' line {f.f_lineno}, in' 

222 f' {f.f_code.co_qualname}()' 

223 ) 

224 else: 

225 c = f.f_generator 

226 

227 try: 

228 f = c.cr_frame 

229 code = c.cr_code 

230 tag = 'async' 

231 except AttributeError: 

232 try: 

233 f = c.ag_frame 

234 code = c.ag_code 

235 tag = 'async generator' 

236 except AttributeError: 

237 f = c.gi_frame 

238 code = c.gi_code 

239 tag = 'generator' 

240 

241 add_line( 

242 f' | File {f.f_code.co_filename!r},' 

243 f' line {f.f_lineno}, in' 

244 f' {tag} {code.co_qualname}()' 

245 ) 

246 

247 if st.awaited_by: 

248 add_line( 

249 f' + Awaited by:' 

250 ) 

251 for fut in st.awaited_by: 

252 render_level(fut, buf, level + 1) 

253 

254 graph = capture_call_graph(future, depth=depth + 1, limit=limit) 

255 if graph is None: 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true

256 return "" 

257 

258 buf: list[str] = [] 

259 try: 

260 render_level(graph, buf, 0) 

261 finally: 

262 # 'graph' has references to frames so we should 

263 # make sure it's GC'ed as soon as we don't need it. 

264 del graph 

265 return '\n'.join(buf) 

266 

267def print_call_graph( 

268 future: futures.Future | None = None, 

269 /, 

270 *, 

271 file: io.Writer[str] | None = None, 

272 depth: int = 1, 

273 limit: int | None = None, 

274) -> None: 

275 """Print the async call graph for the current task or the provided Future.""" 

276 print(format_call_graph(future, depth=depth, limit=limit), file=file)