Coverage for Lib/asyncio/tools.py: 72%

188 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-23 01:21 +0000

1"""Tools to analyze tasks running in asyncio programs.""" 

2 

3from collections import defaultdict 

4import csv 

5from itertools import count 

6from enum import Enum, StrEnum, auto 

7import sys 

8from _remote_debugging import RemoteUnwinder, FrameInfo 

9 

10class NodeType(Enum): 

11 COROUTINE = 1 

12 TASK = 2 

13 

14 

15class CycleFoundException(Exception): 

16 """Raised when there is a cycle when drawing the call tree.""" 

17 def __init__( 

18 self, 

19 cycles: list[list[int]], 

20 id2name: dict[int, str], 

21 ) -> None: 

22 super().__init__(cycles, id2name) 

23 self.cycles = cycles 

24 self.id2name = id2name 

25 

26 

27 

28# ─── indexing helpers ─────────────────────────────────────────── 

29def _format_stack_entry(elem: str|FrameInfo) -> str: 

30 if not isinstance(elem, str): 30 ↛ 35line 30 didn't jump to line 35 because the condition on line 30 was always true

31 if elem.lineno == 0 and elem.filename == "": 

32 return f"{elem.funcname}" 

33 else: 

34 return f"{elem.funcname} {elem.filename}:{elem.lineno}" 

35 return elem 

36 

37 

38def _index(result): 

39 id2name, awaits, task_stacks = {}, [], {} 

40 for awaited_info in result: 

41 for task_info in awaited_info.awaited_by: 

42 task_id = task_info.task_id 

43 task_name = task_info.task_name 

44 id2name[task_id] = task_name 

45 

46 # Store the internal coroutine stack for this task 

47 if task_info.coroutine_stack: 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true

48 for coro_info in task_info.coroutine_stack: 

49 call_stack = coro_info.call_stack 

50 internal_stack = [_format_stack_entry(frame) for frame in call_stack] 

51 task_stacks[task_id] = internal_stack 

52 

53 # Add the awaited_by relationships (external dependencies) 

54 if task_info.awaited_by: 

55 for coro_info in task_info.awaited_by: 

56 call_stack = coro_info.call_stack 

57 parent_task_id = coro_info.task_name 

58 stack = [_format_stack_entry(frame) for frame in call_stack] 

59 awaits.append((parent_task_id, stack, task_id)) 

60 return id2name, awaits, task_stacks 

61 

62 

63def _build_tree(id2name, awaits, task_stacks): 

64 id2label = {(NodeType.TASK, tid): name for tid, name in id2name.items()} 

65 children = defaultdict(list) 

66 cor_nodes = defaultdict(dict) # Maps parent -> {frame_name: node_key} 

67 next_cor_id = count(1) 

68 

69 def get_or_create_cor_node(parent, frame): 

70 """Get existing coroutine node or create new one under parent""" 

71 if frame in cor_nodes[parent]: 

72 return cor_nodes[parent][frame] 

73 

74 node_key = (NodeType.COROUTINE, f"c{next(next_cor_id)}") 

75 id2label[node_key] = frame 

76 children[parent].append(node_key) 

77 cor_nodes[parent][frame] = node_key 

78 return node_key 

79 

80 # Build task dependency tree with coroutine frames 

81 for parent_id, stack, child_id in awaits: 

82 cur = (NodeType.TASK, parent_id) 

83 for frame in reversed(stack): 

84 cur = get_or_create_cor_node(cur, frame) 

85 

86 child_key = (NodeType.TASK, child_id) 

87 if child_key not in children[cur]: 87 ↛ 81line 87 didn't jump to line 81 because the condition on line 87 was always true

88 children[cur].append(child_key) 

89 

90 # Add coroutine stacks for leaf tasks 

91 awaiting_tasks = {parent_id for parent_id, _, _ in awaits} 

92 for task_id in id2name: 

93 if task_id not in awaiting_tasks and task_id in task_stacks: 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true

94 cur = (NodeType.TASK, task_id) 

95 for frame in reversed(task_stacks[task_id]): 

96 cur = get_or_create_cor_node(cur, frame) 

97 

98 return id2label, children 

99 

100 

101def _roots(id2label, children): 

102 all_children = {c for kids in children.values() for c in kids} 

103 return [n for n in id2label if n not in all_children] 

104 

105# ─── detect cycles in the task-to-task graph ─────────────────────── 

106def _task_graph(awaits): 

107 """Return {parent_task_id: {child_task_id, …}, …}.""" 

108 g = defaultdict(set) 

109 for parent_id, _stack, child_id in awaits: 

110 g[parent_id].add(child_id) 

111 return g 

112 

113 

114def _find_cycles(graph): 

115 """ 

116 Depth-first search for back-edges. 

117 

118 Returns a list of cycles (each cycle is a list of task-ids) or an 

119 empty list if the graph is acyclic. 

120 """ 

121 WHITE, GREY, BLACK = 0, 1, 2 

122 color = defaultdict(lambda: WHITE) 

123 path, cycles = [], [] 

124 

125 def dfs(v): 

126 color[v] = GREY 

127 path.append(v) 

128 for w in graph.get(v, ()): 

129 if color[w] == WHITE: 

130 dfs(w) 

131 elif color[w] == GREY: # back-edge → cycle! 

132 i = path.index(w) 

133 cycles.append(path[i:] + [w]) # make a copy 

134 color[v] = BLACK 

135 path.pop() 

136 

137 for v in list(graph): 

138 if color[v] == WHITE: 

139 dfs(v) 

140 return cycles 

141 

142 

143# ─── PRINT TREE FUNCTION ─────────────────────────────────────── 

144def get_all_awaited_by(pid): 

145 unwinder = RemoteUnwinder(pid) 

146 return unwinder.get_all_awaited_by() 

147 

148 

149def build_async_tree(result, task_emoji="(T)", cor_emoji=""): 

150 """ 

151 Build a list of strings for pretty-print an async call tree. 

152 

153 The call tree is produced by `get_all_async_stacks()`, prefixing tasks 

154 with `task_emoji` and coroutine frames with `cor_emoji`. 

155 """ 

156 id2name, awaits, task_stacks = _index(result) 

157 g = _task_graph(awaits) 

158 cycles = _find_cycles(g) 

159 if cycles: 

160 raise CycleFoundException(cycles, id2name) 

161 labels, children = _build_tree(id2name, awaits, task_stacks) 

162 

163 def pretty(node): 

164 flag = task_emoji if node[0] == NodeType.TASK else cor_emoji 

165 return f"{flag} {labels[node]}" 

166 

167 def render(node, prefix="", last=True, buf=None): 

168 if buf is None: 

169 buf = [] 

170 buf.append(f"{prefix}{'└── ' if last else '├── '}{pretty(node)}") 

171 new_pref = prefix + (" " if last else "│ ") 

172 kids = children.get(node, []) 

173 for i, kid in enumerate(kids): 

174 render(kid, new_pref, i == len(kids) - 1, buf) 

175 return buf 

176 

177 return [render(root) for root in _roots(labels, children)] 

178 

179 

180def build_task_table(result): 

181 id2name, _, _ = _index(result) 

182 table = [] 

183 

184 for awaited_info in result: 

185 thread_id = awaited_info.thread_id 

186 for task_info in awaited_info.awaited_by: 

187 # Get task info 

188 task_id = task_info.task_id 

189 task_name = task_info.task_name 

190 

191 # Build coroutine stack string 

192 frames = [frame for coro in task_info.coroutine_stack 

193 for frame in coro.call_stack] 

194 coro_stack = " -> ".join(_format_stack_entry(x).split(" ")[0] 

195 for x in frames) 

196 

197 # Handle tasks with no awaiters 

198 if not task_info.awaited_by: 

199 table.append([thread_id, hex(task_id), task_name, coro_stack, 

200 "", "", "0x0"]) 

201 continue 

202 

203 # Handle tasks with awaiters 

204 for coro_info in task_info.awaited_by: 

205 parent_id = coro_info.task_name 

206 awaiter_frames = [_format_stack_entry(x).split(" ")[0] 

207 for x in coro_info.call_stack] 

208 awaiter_chain = " -> ".join(awaiter_frames) 

209 awaiter_name = id2name.get(parent_id, "Unknown") 

210 parent_id_str = (hex(parent_id) if isinstance(parent_id, int) 

211 else str(parent_id)) 

212 

213 table.append([thread_id, hex(task_id), task_name, coro_stack, 

214 awaiter_chain, awaiter_name, parent_id_str]) 

215 

216 return table 

217 

218def _print_cycle_exception(exception: CycleFoundException): 

219 print("ERROR: await-graph contains cycles - cannot print a tree!", file=sys.stderr) 

220 print("", file=sys.stderr) 

221 for c in exception.cycles: 

222 inames = " → ".join(exception.id2name.get(tid, hex(tid)) for tid in c) 

223 print(f"cycle: {inames}", file=sys.stderr) 

224 

225 

226def _get_awaited_by_tasks(pid: int) -> list: 

227 try: 

228 return get_all_awaited_by(pid) 

229 except RuntimeError as e: 

230 while e.__context__ is not None: 

231 e = e.__context__ 

232 print(f"Error retrieving tasks: {e}") 

233 sys.exit(1) 

234 

235 

236class TaskTableOutputFormat(StrEnum): 

237 table = auto() 

238 csv = auto() 

239 bsv = auto() 

240 # 🍌SV is not just a format. It's a lifestyle. A philosophy. 

241 # https://www.youtube.com/watch?v=RrsVi1P6n0w 

242 

243 

244def display_awaited_by_tasks_table(pid, *, format=TaskTableOutputFormat.table): 

245 """Build and print a table of all pending tasks under `pid`.""" 

246 

247 tasks = _get_awaited_by_tasks(pid) 

248 table = build_task_table(tasks) 

249 format = TaskTableOutputFormat(format) 

250 if format == TaskTableOutputFormat.table: 

251 _display_awaited_by_tasks_table(table) 

252 else: 

253 _display_awaited_by_tasks_csv(table, format=format) 

254 

255 

256_row_header = ('tid', 'task id', 'task name', 'coroutine stack', 

257 'awaiter chain', 'awaiter name', 'awaiter id') 

258 

259 

260def _display_awaited_by_tasks_table(table): 

261 """Print the table in a simple tabular format.""" 

262 print(_fmt_table_row(*_row_header)) 

263 print('-' * 180) 

264 for row in table: 

265 print(_fmt_table_row(*row)) 

266 

267 

268def _fmt_table_row(tid, task_id, task_name, coro_stack, 

269 awaiter_chain, awaiter_name, awaiter_id): 

270 # Format a single row for the table format 

271 return (f'{tid:<10} {task_id:<20} {task_name:<20} {coro_stack:<50} ' 

272 f'{awaiter_chain:<50} {awaiter_name:<15} {awaiter_id:<15}') 

273 

274 

275def _display_awaited_by_tasks_csv(table, *, format): 

276 """Print the table in CSV format""" 

277 if format == TaskTableOutputFormat.csv: 

278 delimiter = ',' 

279 elif format == TaskTableOutputFormat.bsv: 

280 delimiter = '\N{BANANA}' 

281 else: 

282 raise ValueError(f"Unknown output format: {format}") 

283 csv_writer = csv.writer(sys.stdout, delimiter=delimiter) 

284 csv_writer.writerow(_row_header) 

285 csv_writer.writerows(table) 

286 

287 

288def display_awaited_by_tasks_tree(pid: int) -> None: 

289 """Build and print a tree of all pending tasks under `pid`.""" 

290 

291 tasks = _get_awaited_by_tasks(pid) 

292 try: 

293 result = build_async_tree(tasks) 

294 except CycleFoundException as e: 

295 _print_cycle_exception(e) 

296 sys.exit(1) 

297 

298 for tree in result: 

299 print("\n".join(tree))