Coverage for Lib/asyncio/tools.py: 76%

171 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-19 01:30 +0000

1"""Tools to analyze tasks running in asyncio programs.""" 

2 

3from collections import defaultdict, namedtuple 

4from itertools import count 

5from enum import Enum 

6import sys 

7from _remote_debugging import RemoteUnwinder, FrameInfo 

8 

9class NodeType(Enum): 

10 COROUTINE = 1 

11 TASK = 2 

12 

13 

14class CycleFoundException(Exception): 

15 """Raised when there is a cycle when drawing the call tree.""" 

16 def __init__( 

17 self, 

18 cycles: list[list[int]], 

19 id2name: dict[int, str], 

20 ) -> None: 

21 super().__init__(cycles, id2name) 

22 self.cycles = cycles 

23 self.id2name = id2name 

24 

25 

26 

27# ─── indexing helpers ─────────────────────────────────────────── 

28def _format_stack_entry(elem: str|FrameInfo) -> str: 

29 if not isinstance(elem, str): 29 ↛ 34line 29 didn't jump to line 34 because the condition on line 29 was always true

30 if elem.lineno == 0 and elem.filename == "": 

31 return f"{elem.funcname}" 

32 else: 

33 return f"{elem.funcname} {elem.filename}:{elem.lineno}" 

34 return elem 

35 

36 

37def _index(result): 

38 id2name, awaits, task_stacks = {}, [], {} 

39 for awaited_info in result: 

40 for task_info in awaited_info.awaited_by: 

41 task_id = task_info.task_id 

42 task_name = task_info.task_name 

43 id2name[task_id] = task_name 

44 

45 # Store the internal coroutine stack for this task 

46 if task_info.coroutine_stack: 46 ↛ 47line 46 didn't jump to line 47 because the condition on line 46 was never true

47 for coro_info in task_info.coroutine_stack: 

48 call_stack = coro_info.call_stack 

49 internal_stack = [_format_stack_entry(frame) for frame in call_stack] 

50 task_stacks[task_id] = internal_stack 

51 

52 # Add the awaited_by relationships (external dependencies) 

53 if task_info.awaited_by: 

54 for coro_info in task_info.awaited_by: 

55 call_stack = coro_info.call_stack 

56 parent_task_id = coro_info.task_name 

57 stack = [_format_stack_entry(frame) for frame in call_stack] 

58 awaits.append((parent_task_id, stack, task_id)) 

59 return id2name, awaits, task_stacks 

60 

61 

62def _build_tree(id2name, awaits, task_stacks): 

63 id2label = {(NodeType.TASK, tid): name for tid, name in id2name.items()} 

64 children = defaultdict(list) 

65 cor_nodes = defaultdict(dict) # Maps parent -> {frame_name: node_key} 

66 next_cor_id = count(1) 

67 

68 def get_or_create_cor_node(parent, frame): 

69 """Get existing coroutine node or create new one under parent""" 

70 if frame in cor_nodes[parent]: 

71 return cor_nodes[parent][frame] 

72 

73 node_key = (NodeType.COROUTINE, f"c{next(next_cor_id)}") 

74 id2label[node_key] = frame 

75 children[parent].append(node_key) 

76 cor_nodes[parent][frame] = node_key 

77 return node_key 

78 

79 # Build task dependency tree with coroutine frames 

80 for parent_id, stack, child_id in awaits: 

81 cur = (NodeType.TASK, parent_id) 

82 for frame in reversed(stack): 

83 cur = get_or_create_cor_node(cur, frame) 

84 

85 child_key = (NodeType.TASK, child_id) 

86 if child_key not in children[cur]: 86 ↛ 80line 86 didn't jump to line 80 because the condition on line 86 was always true

87 children[cur].append(child_key) 

88 

89 # Add coroutine stacks for leaf tasks 

90 awaiting_tasks = {parent_id for parent_id, _, _ in awaits} 

91 for task_id in id2name: 

92 if task_id not in awaiting_tasks and task_id in task_stacks: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true

93 cur = (NodeType.TASK, task_id) 

94 for frame in reversed(task_stacks[task_id]): 

95 cur = get_or_create_cor_node(cur, frame) 

96 

97 return id2label, children 

98 

99 

100def _roots(id2label, children): 

101 all_children = {c for kids in children.values() for c in kids} 

102 return [n for n in id2label if n not in all_children] 

103 

104# ─── detect cycles in the task-to-task graph ─────────────────────── 

105def _task_graph(awaits): 

106 """Return {parent_task_id: {child_task_id, …}, …}.""" 

107 g = defaultdict(set) 

108 for parent_id, _stack, child_id in awaits: 

109 g[parent_id].add(child_id) 

110 return g 

111 

112 

113def _find_cycles(graph): 

114 """ 

115 Depth-first search for back-edges. 

116 

117 Returns a list of cycles (each cycle is a list of task-ids) or an 

118 empty list if the graph is acyclic. 

119 """ 

120 WHITE, GREY, BLACK = 0, 1, 2 

121 color = defaultdict(lambda: WHITE) 

122 path, cycles = [], [] 

123 

124 def dfs(v): 

125 color[v] = GREY 

126 path.append(v) 

127 for w in graph.get(v, ()): 

128 if color[w] == WHITE: 

129 dfs(w) 

130 elif color[w] == GREY: # back-edge → cycle! 

131 i = path.index(w) 

132 cycles.append(path[i:] + [w]) # make a copy 

133 color[v] = BLACK 

134 path.pop() 

135 

136 for v in list(graph): 

137 if color[v] == WHITE: 

138 dfs(v) 

139 return cycles 

140 

141 

142# ─── PRINT TREE FUNCTION ─────────────────────────────────────── 

143def get_all_awaited_by(pid): 

144 unwinder = RemoteUnwinder(pid) 

145 return unwinder.get_all_awaited_by() 

146 

147 

148def build_async_tree(result, task_emoji="(T)", cor_emoji=""): 

149 """ 

150 Build a list of strings for pretty-print an async call tree. 

151 

152 The call tree is produced by `get_all_async_stacks()`, prefixing tasks 

153 with `task_emoji` and coroutine frames with `cor_emoji`. 

154 """ 

155 id2name, awaits, task_stacks = _index(result) 

156 g = _task_graph(awaits) 

157 cycles = _find_cycles(g) 

158 if cycles: 

159 raise CycleFoundException(cycles, id2name) 

160 labels, children = _build_tree(id2name, awaits, task_stacks) 

161 

162 def pretty(node): 

163 flag = task_emoji if node[0] == NodeType.TASK else cor_emoji 

164 return f"{flag} {labels[node]}" 

165 

166 def render(node, prefix="", last=True, buf=None): 

167 if buf is None: 

168 buf = [] 

169 buf.append(f"{prefix}{'└── ' if last else '├── '}{pretty(node)}") 

170 new_pref = prefix + (" " if last else "│ ") 

171 kids = children.get(node, []) 

172 for i, kid in enumerate(kids): 

173 render(kid, new_pref, i == len(kids) - 1, buf) 

174 return buf 

175 

176 return [render(root) for root in _roots(labels, children)] 

177 

178 

179def build_task_table(result): 

180 id2name, _, _ = _index(result) 

181 table = [] 

182 

183 for awaited_info in result: 

184 thread_id = awaited_info.thread_id 

185 for task_info in awaited_info.awaited_by: 

186 # Get task info 

187 task_id = task_info.task_id 

188 task_name = task_info.task_name 

189 

190 # Build coroutine stack string 

191 frames = [frame for coro in task_info.coroutine_stack 

192 for frame in coro.call_stack] 

193 coro_stack = " -> ".join(_format_stack_entry(x).split(" ")[0] 

194 for x in frames) 

195 

196 # Handle tasks with no awaiters 

197 if not task_info.awaited_by: 

198 table.append([thread_id, hex(task_id), task_name, coro_stack, 

199 "", "", "0x0"]) 

200 continue 

201 

202 # Handle tasks with awaiters 

203 for coro_info in task_info.awaited_by: 

204 parent_id = coro_info.task_name 

205 awaiter_frames = [_format_stack_entry(x).split(" ")[0] 

206 for x in coro_info.call_stack] 

207 awaiter_chain = " -> ".join(awaiter_frames) 

208 awaiter_name = id2name.get(parent_id, "Unknown") 

209 parent_id_str = (hex(parent_id) if isinstance(parent_id, int) 

210 else str(parent_id)) 

211 

212 table.append([thread_id, hex(task_id), task_name, coro_stack, 

213 awaiter_chain, awaiter_name, parent_id_str]) 

214 

215 return table 

216 

217def _print_cycle_exception(exception: CycleFoundException): 

218 print("ERROR: await-graph contains cycles - cannot print a tree!", file=sys.stderr) 

219 print("", file=sys.stderr) 

220 for c in exception.cycles: 

221 inames = " → ".join(exception.id2name.get(tid, hex(tid)) for tid in c) 

222 print(f"cycle: {inames}", file=sys.stderr) 

223 

224 

225def exit_with_permission_help_text(): 

226 """ 

227 Prints a message pointing to platform-specific permission help text and exits the program. 

228 This function is called when a PermissionError is encountered while trying 

229 to attach to a process. 

230 """ 

231 print( 

232 "Error: The specified process cannot be attached to due to insufficient permissions.\n" 

233 "See the Python documentation for details on required privileges and troubleshooting:\n" 

234 "https://docs.python.org/3.14/howto/remote_debugging.html#permission-requirements\n" 

235 ) 

236 sys.exit(1) 

237 

238 

239def _get_awaited_by_tasks(pid: int) -> list: 

240 try: 

241 return get_all_awaited_by(pid) 

242 except RuntimeError as e: 

243 while e.__context__ is not None: 

244 e = e.__context__ 

245 print(f"Error retrieving tasks: {e}") 

246 sys.exit(1) 

247 except PermissionError as e: 

248 exit_with_permission_help_text() 

249 

250 

251def display_awaited_by_tasks_table(pid: int) -> None: 

252 """Build and print a table of all pending tasks under `pid`.""" 

253 

254 tasks = _get_awaited_by_tasks(pid) 

255 table = build_task_table(tasks) 

256 # Print the table in a simple tabular format 

257 print( 

258 f"{'tid':<10} {'task id':<20} {'task name':<20} {'coroutine stack':<50} {'awaiter chain':<50} {'awaiter name':<15} {'awaiter id':<15}" 

259 ) 

260 print("-" * 180) 

261 for row in table: 

262 print(f"{row[0]:<10} {row[1]:<20} {row[2]:<20} {row[3]:<50} {row[4]:<50} {row[5]:<15} {row[6]:<15}") 

263 

264 

265def display_awaited_by_tasks_tree(pid: int) -> None: 

266 """Build and print a tree of all pending tasks under `pid`.""" 

267 

268 tasks = _get_awaited_by_tasks(pid) 

269 try: 

270 result = build_async_tree(tasks) 

271 except CycleFoundException as e: 

272 _print_cycle_exception(e) 

273 sys.exit(1) 

274 

275 for tree in result: 

276 print("\n".join(tree))