Coverage for Lib/asyncio/tools.py: 72%
188 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-23 01:21 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-23 01:21 +0000
1"""Tools to analyze tasks running in asyncio programs."""
3from collections import defaultdict
4import csv
5from itertools import count
6from enum import Enum, StrEnum, auto
7import sys
8from _remote_debugging import RemoteUnwinder, FrameInfo
10class NodeType(Enum):
11 COROUTINE = 1
12 TASK = 2
15class CycleFoundException(Exception):
16 """Raised when there is a cycle when drawing the call tree."""
17 def __init__(
18 self,
19 cycles: list[list[int]],
20 id2name: dict[int, str],
21 ) -> None:
22 super().__init__(cycles, id2name)
23 self.cycles = cycles
24 self.id2name = id2name
28# ─── indexing helpers ───────────────────────────────────────────
29def _format_stack_entry(elem: str|FrameInfo) -> str:
30 if not isinstance(elem, str): 30 ↛ 35line 30 didn't jump to line 35 because the condition on line 30 was always true
31 if elem.lineno == 0 and elem.filename == "":
32 return f"{elem.funcname}"
33 else:
34 return f"{elem.funcname} {elem.filename}:{elem.lineno}"
35 return elem
38def _index(result):
39 id2name, awaits, task_stacks = {}, [], {}
40 for awaited_info in result:
41 for task_info in awaited_info.awaited_by:
42 task_id = task_info.task_id
43 task_name = task_info.task_name
44 id2name[task_id] = task_name
46 # Store the internal coroutine stack for this task
47 if task_info.coroutine_stack: 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true
48 for coro_info in task_info.coroutine_stack:
49 call_stack = coro_info.call_stack
50 internal_stack = [_format_stack_entry(frame) for frame in call_stack]
51 task_stacks[task_id] = internal_stack
53 # Add the awaited_by relationships (external dependencies)
54 if task_info.awaited_by:
55 for coro_info in task_info.awaited_by:
56 call_stack = coro_info.call_stack
57 parent_task_id = coro_info.task_name
58 stack = [_format_stack_entry(frame) for frame in call_stack]
59 awaits.append((parent_task_id, stack, task_id))
60 return id2name, awaits, task_stacks
63def _build_tree(id2name, awaits, task_stacks):
64 id2label = {(NodeType.TASK, tid): name for tid, name in id2name.items()}
65 children = defaultdict(list)
66 cor_nodes = defaultdict(dict) # Maps parent -> {frame_name: node_key}
67 next_cor_id = count(1)
69 def get_or_create_cor_node(parent, frame):
70 """Get existing coroutine node or create new one under parent"""
71 if frame in cor_nodes[parent]:
72 return cor_nodes[parent][frame]
74 node_key = (NodeType.COROUTINE, f"c{next(next_cor_id)}")
75 id2label[node_key] = frame
76 children[parent].append(node_key)
77 cor_nodes[parent][frame] = node_key
78 return node_key
80 # Build task dependency tree with coroutine frames
81 for parent_id, stack, child_id in awaits:
82 cur = (NodeType.TASK, parent_id)
83 for frame in reversed(stack):
84 cur = get_or_create_cor_node(cur, frame)
86 child_key = (NodeType.TASK, child_id)
87 if child_key not in children[cur]: 87 ↛ 81line 87 didn't jump to line 81 because the condition on line 87 was always true
88 children[cur].append(child_key)
90 # Add coroutine stacks for leaf tasks
91 awaiting_tasks = {parent_id for parent_id, _, _ in awaits}
92 for task_id in id2name:
93 if task_id not in awaiting_tasks and task_id in task_stacks: 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true
94 cur = (NodeType.TASK, task_id)
95 for frame in reversed(task_stacks[task_id]):
96 cur = get_or_create_cor_node(cur, frame)
98 return id2label, children
101def _roots(id2label, children):
102 all_children = {c for kids in children.values() for c in kids}
103 return [n for n in id2label if n not in all_children]
105# ─── detect cycles in the task-to-task graph ───────────────────────
106def _task_graph(awaits):
107 """Return {parent_task_id: {child_task_id, …}, …}."""
108 g = defaultdict(set)
109 for parent_id, _stack, child_id in awaits:
110 g[parent_id].add(child_id)
111 return g
114def _find_cycles(graph):
115 """
116 Depth-first search for back-edges.
118 Returns a list of cycles (each cycle is a list of task-ids) or an
119 empty list if the graph is acyclic.
120 """
121 WHITE, GREY, BLACK = 0, 1, 2
122 color = defaultdict(lambda: WHITE)
123 path, cycles = [], []
125 def dfs(v):
126 color[v] = GREY
127 path.append(v)
128 for w in graph.get(v, ()):
129 if color[w] == WHITE:
130 dfs(w)
131 elif color[w] == GREY: # back-edge → cycle!
132 i = path.index(w)
133 cycles.append(path[i:] + [w]) # make a copy
134 color[v] = BLACK
135 path.pop()
137 for v in list(graph):
138 if color[v] == WHITE:
139 dfs(v)
140 return cycles
143# ─── PRINT TREE FUNCTION ───────────────────────────────────────
144def get_all_awaited_by(pid):
145 unwinder = RemoteUnwinder(pid)
146 return unwinder.get_all_awaited_by()
149def build_async_tree(result, task_emoji="(T)", cor_emoji=""):
150 """
151 Build a list of strings for pretty-print an async call tree.
153 The call tree is produced by `get_all_async_stacks()`, prefixing tasks
154 with `task_emoji` and coroutine frames with `cor_emoji`.
155 """
156 id2name, awaits, task_stacks = _index(result)
157 g = _task_graph(awaits)
158 cycles = _find_cycles(g)
159 if cycles:
160 raise CycleFoundException(cycles, id2name)
161 labels, children = _build_tree(id2name, awaits, task_stacks)
163 def pretty(node):
164 flag = task_emoji if node[0] == NodeType.TASK else cor_emoji
165 return f"{flag} {labels[node]}"
167 def render(node, prefix="", last=True, buf=None):
168 if buf is None:
169 buf = []
170 buf.append(f"{prefix}{'└── ' if last else '├── '}{pretty(node)}")
171 new_pref = prefix + (" " if last else "│ ")
172 kids = children.get(node, [])
173 for i, kid in enumerate(kids):
174 render(kid, new_pref, i == len(kids) - 1, buf)
175 return buf
177 return [render(root) for root in _roots(labels, children)]
180def build_task_table(result):
181 id2name, _, _ = _index(result)
182 table = []
184 for awaited_info in result:
185 thread_id = awaited_info.thread_id
186 for task_info in awaited_info.awaited_by:
187 # Get task info
188 task_id = task_info.task_id
189 task_name = task_info.task_name
191 # Build coroutine stack string
192 frames = [frame for coro in task_info.coroutine_stack
193 for frame in coro.call_stack]
194 coro_stack = " -> ".join(_format_stack_entry(x).split(" ")[0]
195 for x in frames)
197 # Handle tasks with no awaiters
198 if not task_info.awaited_by:
199 table.append([thread_id, hex(task_id), task_name, coro_stack,
200 "", "", "0x0"])
201 continue
203 # Handle tasks with awaiters
204 for coro_info in task_info.awaited_by:
205 parent_id = coro_info.task_name
206 awaiter_frames = [_format_stack_entry(x).split(" ")[0]
207 for x in coro_info.call_stack]
208 awaiter_chain = " -> ".join(awaiter_frames)
209 awaiter_name = id2name.get(parent_id, "Unknown")
210 parent_id_str = (hex(parent_id) if isinstance(parent_id, int)
211 else str(parent_id))
213 table.append([thread_id, hex(task_id), task_name, coro_stack,
214 awaiter_chain, awaiter_name, parent_id_str])
216 return table
218def _print_cycle_exception(exception: CycleFoundException):
219 print("ERROR: await-graph contains cycles - cannot print a tree!", file=sys.stderr)
220 print("", file=sys.stderr)
221 for c in exception.cycles:
222 inames = " → ".join(exception.id2name.get(tid, hex(tid)) for tid in c)
223 print(f"cycle: {inames}", file=sys.stderr)
226def _get_awaited_by_tasks(pid: int) -> list:
227 try:
228 return get_all_awaited_by(pid)
229 except RuntimeError as e:
230 while e.__context__ is not None:
231 e = e.__context__
232 print(f"Error retrieving tasks: {e}")
233 sys.exit(1)
236class TaskTableOutputFormat(StrEnum):
237 table = auto()
238 csv = auto()
239 bsv = auto()
240 # 🍌SV is not just a format. It's a lifestyle. A philosophy.
241 # https://www.youtube.com/watch?v=RrsVi1P6n0w
244def display_awaited_by_tasks_table(pid, *, format=TaskTableOutputFormat.table):
245 """Build and print a table of all pending tasks under `pid`."""
247 tasks = _get_awaited_by_tasks(pid)
248 table = build_task_table(tasks)
249 format = TaskTableOutputFormat(format)
250 if format == TaskTableOutputFormat.table:
251 _display_awaited_by_tasks_table(table)
252 else:
253 _display_awaited_by_tasks_csv(table, format=format)
256_row_header = ('tid', 'task id', 'task name', 'coroutine stack',
257 'awaiter chain', 'awaiter name', 'awaiter id')
260def _display_awaited_by_tasks_table(table):
261 """Print the table in a simple tabular format."""
262 print(_fmt_table_row(*_row_header))
263 print('-' * 180)
264 for row in table:
265 print(_fmt_table_row(*row))
268def _fmt_table_row(tid, task_id, task_name, coro_stack,
269 awaiter_chain, awaiter_name, awaiter_id):
270 # Format a single row for the table format
271 return (f'{tid:<10} {task_id:<20} {task_name:<20} {coro_stack:<50} '
272 f'{awaiter_chain:<50} {awaiter_name:<15} {awaiter_id:<15}')
275def _display_awaited_by_tasks_csv(table, *, format):
276 """Print the table in CSV format"""
277 if format == TaskTableOutputFormat.csv:
278 delimiter = ','
279 elif format == TaskTableOutputFormat.bsv:
280 delimiter = '\N{BANANA}'
281 else:
282 raise ValueError(f"Unknown output format: {format}")
283 csv_writer = csv.writer(sys.stdout, delimiter=delimiter)
284 csv_writer.writerow(_row_header)
285 csv_writer.writerows(table)
288def display_awaited_by_tasks_tree(pid: int) -> None:
289 """Build and print a tree of all pending tasks under `pid`."""
291 tasks = _get_awaited_by_tasks(pid)
292 try:
293 result = build_async_tree(tasks)
294 except CycleFoundException as e:
295 _print_cycle_exception(e)
296 sys.exit(1)
298 for tree in result:
299 print("\n".join(tree))