Coverage for Lib/asyncio/graph.py: 78%

115 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-15 02:02 +0000

1"""Introspection utils for tasks call graphs.""" 

2 

3import dataclasses 

4import sys 

5import types 

6 

7from . import events 

8from . import futures 

9from . import tasks 

10 

11__all__ = ( 

12 'capture_call_graph', 

13 'format_call_graph', 

14 'print_call_graph', 

15 'FrameCallGraphEntry', 

16 'FutureCallGraph', 

17) 

18 

19if False: # for type checkers 

20 from typing import TextIO 

21 

22# Sadly, we can't re-use the traceback module's datastructures as those 

23# are tailored for error reporting, whereas we need to represent an 

24# async call graph. 

25# 

26# Going with pretty verbose names as we'd like to export them to the 

27# top level asyncio namespace, and want to avoid future name clashes. 

28 

29 

30@dataclasses.dataclass(frozen=True, slots=True) 

31class FrameCallGraphEntry: 

32 frame: types.FrameType 

33 

34 

35@dataclasses.dataclass(frozen=True, slots=True) 

36class FutureCallGraph: 

37 future: futures.Future 

38 call_stack: tuple["FrameCallGraphEntry", ...] 

39 awaited_by: tuple["FutureCallGraph", ...] 

40 

41 

42def _build_graph_for_future( 

43 future: futures.Future, 

44 *, 

45 limit: int | None = None, 

46) -> FutureCallGraph: 

47 if not isinstance(future, futures.Future): 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true

48 raise TypeError( 

49 f"{future!r} object does not appear to be compatible " 

50 f"with asyncio.Future" 

51 ) 

52 

53 coro = None 

54 if get_coro := getattr(future, 'get_coro', None): 

55 coro = get_coro() if limit != 0 else None 

56 

57 st: list[FrameCallGraphEntry] = [] 

58 awaited_by: list[FutureCallGraph] = [] 

59 

60 while coro is not None: 

61 if hasattr(coro, 'cr_await'): 

62 # A native coroutine or duck-type compatible iterator 

63 st.append(FrameCallGraphEntry(coro.cr_frame)) 

64 coro = coro.cr_await 

65 elif hasattr(coro, 'ag_await'): 65 ↛ 67line 65 didn't jump to line 67 because the condition on line 65 was never true

66 # A native async generator or duck-type compatible iterator 

67 st.append(FrameCallGraphEntry(coro.cr_frame)) 

68 coro = coro.ag_await 

69 else: 

70 break 

71 

72 if future._asyncio_awaited_by: 

73 for parent in future._asyncio_awaited_by: 

74 awaited_by.append(_build_graph_for_future(parent, limit=limit)) 

75 

76 if limit is not None: 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true

77 if limit > 0: 

78 st = st[:limit] 

79 elif limit < 0: 

80 st = st[limit:] 

81 st.reverse() 

82 return FutureCallGraph(future, tuple(st), tuple(awaited_by)) 

83 

84 

85def capture_call_graph( 

86 future: futures.Future | None = None, 

87 /, 

88 *, 

89 depth: int = 1, 

90 limit: int | None = None, 

91) -> FutureCallGraph | None: 

92 """Capture the async call graph for the current task or the provided Future. 

93 

94 The graph is represented with three data structures: 

95 

96 * FutureCallGraph(future, call_stack, awaited_by) 

97 

98 Where 'future' is an instance of asyncio.Future or asyncio.Task. 

99 

100 'call_stack' is a tuple of FrameGraphEntry objects. 

101 

102 'awaited_by' is a tuple of FutureCallGraph objects. 

103 

104 * FrameCallGraphEntry(frame) 

105 

106 Where 'frame' is a frame object of a regular Python function 

107 in the call stack. 

108 

109 Receives an optional 'future' argument. If not passed, 

110 the current task will be used. If there's no current task, the function 

111 returns None. 

112 

113 If "capture_call_graph()" is introspecting *the current task*, the 

114 optional keyword-only 'depth' argument can be used to skip the specified 

115 number of frames from top of the stack. 

116 

117 If the optional keyword-only 'limit' argument is provided, each call stack 

118 in the resulting graph is truncated to include at most ``abs(limit)`` 

119 entries. If 'limit' is positive, the entries left are the closest to 

120 the invocation point. If 'limit' is negative, the topmost entries are 

121 left. If 'limit' is omitted or None, all entries are present. 

122 If 'limit' is 0, the call stack is not captured at all, only 

123 "awaited by" information is present. 

124 """ 

125 

126 loop = events._get_running_loop() 

127 

128 if future is not None: 

129 # Check if we're in a context of a running event loop; 

130 # if yes - check if the passed future is the currently 

131 # running task or not. 

132 if loop is None or future is not tasks.current_task(loop=loop): 132 ↛ 142line 132 didn't jump to line 142 because the condition on line 132 was always true

133 return _build_graph_for_future(future, limit=limit) 

134 # else: future is the current task, move on. 

135 else: 

136 if loop is None: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 raise RuntimeError( 

138 'capture_call_graph() is called outside of a running ' 

139 'event loop and no *future* to introspect was provided') 

140 future = tasks.current_task(loop=loop) 

141 

142 if future is None: 142 ↛ 146line 142 didn't jump to line 146 because the condition on line 142 was never true

143 # This isn't a generic call stack introspection utility. If we 

144 # can't determine the current task and none was provided, we 

145 # just return. 

146 return None 

147 

148 if not isinstance(future, futures.Future): 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true

149 raise TypeError( 

150 f"{future!r} object does not appear to be compatible " 

151 f"with asyncio.Future" 

152 ) 

153 

154 call_stack: list[FrameCallGraphEntry] = [] 

155 

156 f = sys._getframe(depth) if limit != 0 else None 

157 try: 

158 while f is not None: 158 ↛ 170line 158 didn't jump to line 170 because the condition on line 158 was always true

159 is_async = f.f_generator is not None 

160 call_stack.append(FrameCallGraphEntry(f)) 

161 

162 if is_async: 

163 if f.f_back is not None and f.f_back.f_generator is None: 

164 # We've reached the bottom of the coroutine stack, which 

165 # must be the Task that runs it. 

166 break 

167 

168 f = f.f_back 

169 finally: 

170 del f 

171 

172 awaited_by = [] 

173 if future._asyncio_awaited_by: 

174 for parent in future._asyncio_awaited_by: 

175 awaited_by.append(_build_graph_for_future(parent, limit=limit)) 

176 

177 if limit is not None: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true

178 limit *= -1 

179 if limit > 0: 

180 call_stack = call_stack[:limit] 

181 elif limit < 0: 

182 call_stack = call_stack[limit:] 

183 

184 return FutureCallGraph(future, tuple(call_stack), tuple(awaited_by)) 

185 

186 

187def format_call_graph( 

188 future: futures.Future | None = None, 

189 /, 

190 *, 

191 depth: int = 1, 

192 limit: int | None = None, 

193) -> str: 

194 """Return the async call graph as a string for `future`. 

195 

196 If `future` is not provided, format the call graph for the current task. 

197 """ 

198 

199 def render_level(st: FutureCallGraph, buf: list[str], level: int) -> None: 

200 def add_line(line: str) -> None: 

201 buf.append(level * ' ' + line) 

202 

203 if isinstance(st.future, tasks.Task): 

204 add_line( 

205 f'* Task(name={st.future.get_name()!r}, id={id(st.future):#x})' 

206 ) 

207 else: 

208 add_line( 

209 f'* Future(id={id(st.future):#x})' 

210 ) 

211 

212 if st.call_stack: 

213 add_line( 

214 f' + Call stack:' 

215 ) 

216 for ste in st.call_stack: 

217 f = ste.frame 

218 

219 if f.f_generator is None: 

220 f = ste.frame 

221 add_line( 

222 f' | File {f.f_code.co_filename!r},' 

223 f' line {f.f_lineno}, in' 

224 f' {f.f_code.co_qualname}()' 

225 ) 

226 else: 

227 c = f.f_generator 

228 

229 try: 

230 f = c.cr_frame 

231 code = c.cr_code 

232 tag = 'async' 

233 except AttributeError: 

234 try: 

235 f = c.ag_frame 

236 code = c.ag_code 

237 tag = 'async generator' 

238 except AttributeError: 

239 f = c.gi_frame 

240 code = c.gi_code 

241 tag = 'generator' 

242 

243 add_line( 

244 f' | File {f.f_code.co_filename!r},' 

245 f' line {f.f_lineno}, in' 

246 f' {tag} {code.co_qualname}()' 

247 ) 

248 

249 if st.awaited_by: 

250 add_line( 

251 f' + Awaited by:' 

252 ) 

253 for fut in st.awaited_by: 

254 render_level(fut, buf, level + 1) 

255 

256 graph = capture_call_graph(future, depth=depth + 1, limit=limit) 

257 if graph is None: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true

258 return "" 

259 

260 buf: list[str] = [] 

261 try: 

262 render_level(graph, buf, 0) 

263 finally: 

264 # 'graph' has references to frames so we should 

265 # make sure it's GC'ed as soon as we don't need it. 

266 del graph 

267 return '\n'.join(buf) 

268 

269def print_call_graph( 

270 future: futures.Future | None = None, 

271 /, 

272 *, 

273 file: TextIO | None = None, 

274 depth: int = 1, 

275 limit: int | None = None, 

276) -> None: 

277 """Print the async call graph for the current task or the provided Future.""" 

278 print(format_call_graph(future, depth=depth, limit=limit), file=file)