Coverage for Lib/asyncio/tools.py: 76%
171 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-19 01:30 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-19 01:30 +0000
1"""Tools to analyze tasks running in asyncio programs."""
3from collections import defaultdict, namedtuple
4from itertools import count
5from enum import Enum
6import sys
7from _remote_debugging import RemoteUnwinder, FrameInfo
9class NodeType(Enum):
10 COROUTINE = 1
11 TASK = 2
14class CycleFoundException(Exception):
15 """Raised when there is a cycle when drawing the call tree."""
16 def __init__(
17 self,
18 cycles: list[list[int]],
19 id2name: dict[int, str],
20 ) -> None:
21 super().__init__(cycles, id2name)
22 self.cycles = cycles
23 self.id2name = id2name
27# ─── indexing helpers ───────────────────────────────────────────
28def _format_stack_entry(elem: str|FrameInfo) -> str:
29 if not isinstance(elem, str): 29 ↛ 34line 29 didn't jump to line 34 because the condition on line 29 was always true
30 if elem.lineno == 0 and elem.filename == "":
31 return f"{elem.funcname}"
32 else:
33 return f"{elem.funcname} {elem.filename}:{elem.lineno}"
34 return elem
37def _index(result):
38 id2name, awaits, task_stacks = {}, [], {}
39 for awaited_info in result:
40 for task_info in awaited_info.awaited_by:
41 task_id = task_info.task_id
42 task_name = task_info.task_name
43 id2name[task_id] = task_name
45 # Store the internal coroutine stack for this task
46 if task_info.coroutine_stack: 46 ↛ 47line 46 didn't jump to line 47 because the condition on line 46 was never true
47 for coro_info in task_info.coroutine_stack:
48 call_stack = coro_info.call_stack
49 internal_stack = [_format_stack_entry(frame) for frame in call_stack]
50 task_stacks[task_id] = internal_stack
52 # Add the awaited_by relationships (external dependencies)
53 if task_info.awaited_by:
54 for coro_info in task_info.awaited_by:
55 call_stack = coro_info.call_stack
56 parent_task_id = coro_info.task_name
57 stack = [_format_stack_entry(frame) for frame in call_stack]
58 awaits.append((parent_task_id, stack, task_id))
59 return id2name, awaits, task_stacks
62def _build_tree(id2name, awaits, task_stacks):
63 id2label = {(NodeType.TASK, tid): name for tid, name in id2name.items()}
64 children = defaultdict(list)
65 cor_nodes = defaultdict(dict) # Maps parent -> {frame_name: node_key}
66 next_cor_id = count(1)
68 def get_or_create_cor_node(parent, frame):
69 """Get existing coroutine node or create new one under parent"""
70 if frame in cor_nodes[parent]:
71 return cor_nodes[parent][frame]
73 node_key = (NodeType.COROUTINE, f"c{next(next_cor_id)}")
74 id2label[node_key] = frame
75 children[parent].append(node_key)
76 cor_nodes[parent][frame] = node_key
77 return node_key
79 # Build task dependency tree with coroutine frames
80 for parent_id, stack, child_id in awaits:
81 cur = (NodeType.TASK, parent_id)
82 for frame in reversed(stack):
83 cur = get_or_create_cor_node(cur, frame)
85 child_key = (NodeType.TASK, child_id)
86 if child_key not in children[cur]: 86 ↛ 80line 86 didn't jump to line 80 because the condition on line 86 was always true
87 children[cur].append(child_key)
89 # Add coroutine stacks for leaf tasks
90 awaiting_tasks = {parent_id for parent_id, _, _ in awaits}
91 for task_id in id2name:
92 if task_id not in awaiting_tasks and task_id in task_stacks: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true
93 cur = (NodeType.TASK, task_id)
94 for frame in reversed(task_stacks[task_id]):
95 cur = get_or_create_cor_node(cur, frame)
97 return id2label, children
100def _roots(id2label, children):
101 all_children = {c for kids in children.values() for c in kids}
102 return [n for n in id2label if n not in all_children]
104# ─── detect cycles in the task-to-task graph ───────────────────────
105def _task_graph(awaits):
106 """Return {parent_task_id: {child_task_id, …}, …}."""
107 g = defaultdict(set)
108 for parent_id, _stack, child_id in awaits:
109 g[parent_id].add(child_id)
110 return g
113def _find_cycles(graph):
114 """
115 Depth-first search for back-edges.
117 Returns a list of cycles (each cycle is a list of task-ids) or an
118 empty list if the graph is acyclic.
119 """
120 WHITE, GREY, BLACK = 0, 1, 2
121 color = defaultdict(lambda: WHITE)
122 path, cycles = [], []
124 def dfs(v):
125 color[v] = GREY
126 path.append(v)
127 for w in graph.get(v, ()):
128 if color[w] == WHITE:
129 dfs(w)
130 elif color[w] == GREY: # back-edge → cycle!
131 i = path.index(w)
132 cycles.append(path[i:] + [w]) # make a copy
133 color[v] = BLACK
134 path.pop()
136 for v in list(graph):
137 if color[v] == WHITE:
138 dfs(v)
139 return cycles
142# ─── PRINT TREE FUNCTION ───────────────────────────────────────
143def get_all_awaited_by(pid):
144 unwinder = RemoteUnwinder(pid)
145 return unwinder.get_all_awaited_by()
148def build_async_tree(result, task_emoji="(T)", cor_emoji=""):
149 """
150 Build a list of strings for pretty-print an async call tree.
152 The call tree is produced by `get_all_async_stacks()`, prefixing tasks
153 with `task_emoji` and coroutine frames with `cor_emoji`.
154 """
155 id2name, awaits, task_stacks = _index(result)
156 g = _task_graph(awaits)
157 cycles = _find_cycles(g)
158 if cycles:
159 raise CycleFoundException(cycles, id2name)
160 labels, children = _build_tree(id2name, awaits, task_stacks)
162 def pretty(node):
163 flag = task_emoji if node[0] == NodeType.TASK else cor_emoji
164 return f"{flag} {labels[node]}"
166 def render(node, prefix="", last=True, buf=None):
167 if buf is None:
168 buf = []
169 buf.append(f"{prefix}{'└── ' if last else '├── '}{pretty(node)}")
170 new_pref = prefix + (" " if last else "│ ")
171 kids = children.get(node, [])
172 for i, kid in enumerate(kids):
173 render(kid, new_pref, i == len(kids) - 1, buf)
174 return buf
176 return [render(root) for root in _roots(labels, children)]
179def build_task_table(result):
180 id2name, _, _ = _index(result)
181 table = []
183 for awaited_info in result:
184 thread_id = awaited_info.thread_id
185 for task_info in awaited_info.awaited_by:
186 # Get task info
187 task_id = task_info.task_id
188 task_name = task_info.task_name
190 # Build coroutine stack string
191 frames = [frame for coro in task_info.coroutine_stack
192 for frame in coro.call_stack]
193 coro_stack = " -> ".join(_format_stack_entry(x).split(" ")[0]
194 for x in frames)
196 # Handle tasks with no awaiters
197 if not task_info.awaited_by:
198 table.append([thread_id, hex(task_id), task_name, coro_stack,
199 "", "", "0x0"])
200 continue
202 # Handle tasks with awaiters
203 for coro_info in task_info.awaited_by:
204 parent_id = coro_info.task_name
205 awaiter_frames = [_format_stack_entry(x).split(" ")[0]
206 for x in coro_info.call_stack]
207 awaiter_chain = " -> ".join(awaiter_frames)
208 awaiter_name = id2name.get(parent_id, "Unknown")
209 parent_id_str = (hex(parent_id) if isinstance(parent_id, int)
210 else str(parent_id))
212 table.append([thread_id, hex(task_id), task_name, coro_stack,
213 awaiter_chain, awaiter_name, parent_id_str])
215 return table
217def _print_cycle_exception(exception: CycleFoundException):
218 print("ERROR: await-graph contains cycles - cannot print a tree!", file=sys.stderr)
219 print("", file=sys.stderr)
220 for c in exception.cycles:
221 inames = " → ".join(exception.id2name.get(tid, hex(tid)) for tid in c)
222 print(f"cycle: {inames}", file=sys.stderr)
225def exit_with_permission_help_text():
226 """
227 Prints a message pointing to platform-specific permission help text and exits the program.
228 This function is called when a PermissionError is encountered while trying
229 to attach to a process.
230 """
231 print(
232 "Error: The specified process cannot be attached to due to insufficient permissions.\n"
233 "See the Python documentation for details on required privileges and troubleshooting:\n"
234 "https://docs.python.org/3.14/howto/remote_debugging.html#permission-requirements\n"
235 )
236 sys.exit(1)
239def _get_awaited_by_tasks(pid: int) -> list:
240 try:
241 return get_all_awaited_by(pid)
242 except RuntimeError as e:
243 while e.__context__ is not None:
244 e = e.__context__
245 print(f"Error retrieving tasks: {e}")
246 sys.exit(1)
247 except PermissionError as e:
248 exit_with_permission_help_text()
251def display_awaited_by_tasks_table(pid: int) -> None:
252 """Build and print a table of all pending tasks under `pid`."""
254 tasks = _get_awaited_by_tasks(pid)
255 table = build_task_table(tasks)
256 # Print the table in a simple tabular format
257 print(
258 f"{'tid':<10} {'task id':<20} {'task name':<20} {'coroutine stack':<50} {'awaiter chain':<50} {'awaiter name':<15} {'awaiter id':<15}"
259 )
260 print("-" * 180)
261 for row in table:
262 print(f"{row[0]:<10} {row[1]:<20} {row[2]:<20} {row[3]:<50} {row[4]:<50} {row[5]:<15} {row[6]:<15}")
265def display_awaited_by_tasks_tree(pid: int) -> None:
266 """Build and print a tree of all pending tasks under `pid`."""
268 tasks = _get_awaited_by_tasks(pid)
269 try:
270 result = build_async_tree(tasks)
271 except CycleFoundException as e:
272 _print_cycle_exception(e)
273 sys.exit(1)
275 for tree in result:
276 print("\n".join(tree))