Coverage for Lib/asyncio/__main__.py: 0%
97 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-23 01:21 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-23 01:21 +0000
1import argparse
2import ast
3import asyncio
4import concurrent.futures
5import contextvars
6import inspect
7import os
8import site
9import sys
10import threading
11import types
12import warnings
13from asyncio.tools import (TaskTableOutputFormat,
14 display_awaited_by_tasks_table,
15 display_awaited_by_tasks_tree)
17from _colorize import get_theme
18from _pyrepl.console import InteractiveColoredConsole
20from . import futures
23class AsyncIOInteractiveConsole(InteractiveColoredConsole):
25 def __init__(self, locals, loop):
26 super().__init__(locals, filename="<stdin>")
27 self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT
29 self.loop = loop
30 self.context = contextvars.copy_context()
32 def runcode(self, code):
33 global return_code
34 future = concurrent.futures.Future()
36 def callback():
37 global return_code
38 global repl_future
39 global keyboard_interrupted
41 repl_future = None
42 keyboard_interrupted = False
44 func = types.FunctionType(code, self.locals)
45 try:
46 coro = func()
47 except SystemExit as se:
48 return_code = se.code
49 self.loop.stop()
50 return
51 except KeyboardInterrupt as ex:
52 keyboard_interrupted = True
53 future.set_exception(ex)
54 return
55 except BaseException as ex:
56 future.set_exception(ex)
57 return
59 if not inspect.iscoroutine(coro):
60 future.set_result(coro)
61 return
63 try:
64 repl_future = self.loop.create_task(coro, context=self.context)
65 futures._chain_future(repl_future, future)
66 except BaseException as exc:
67 future.set_exception(exc)
69 self.loop.call_soon_threadsafe(callback, context=self.context)
71 try:
72 return future.result()
73 except SystemExit as se:
74 return_code = se.code
75 self.loop.stop()
76 return
77 except BaseException:
78 if keyboard_interrupted:
79 self.write("\nKeyboardInterrupt\n")
80 else:
81 self.showtraceback()
82 return self.STATEMENT_FAILED
84class REPLThread(threading.Thread):
86 def run(self):
87 global return_code
89 try:
90 banner = (
91 f'asyncio REPL {sys.version} on {sys.platform}\n'
92 f'Use "await" directly instead of "asyncio.run()".\n'
93 f'Type "help", "copyright", "credits" or "license" '
94 f'for more information.\n'
95 )
97 console.write(banner)
99 if startup_path := os.getenv("PYTHONSTARTUP"):
100 sys.audit("cpython.run_startup", startup_path)
102 import tokenize
103 with tokenize.open(startup_path) as f:
104 startup_code = compile(f.read(), startup_path, "exec")
105 exec(startup_code, console.locals)
107 ps1 = getattr(sys, "ps1", ">>> ")
108 if CAN_USE_PYREPL:
109 theme = get_theme().syntax
110 ps1 = f"{theme.prompt}{ps1}{theme.reset}"
111 console.write(f"{ps1}import asyncio\n")
113 if CAN_USE_PYREPL:
114 from _pyrepl.simple_interact import (
115 run_multiline_interactive_console,
116 )
117 try:
118 run_multiline_interactive_console(console)
119 except SystemExit:
120 # expected via the `exit` and `quit` commands
121 pass
122 except BaseException:
123 # unexpected issue
124 console.showtraceback()
125 console.write("Internal error, ")
126 return_code = 1
127 else:
128 console.interact(banner="", exitmsg="")
129 finally:
130 warnings.filterwarnings(
131 'ignore',
132 message=r'^coroutine .* was never awaited$',
133 category=RuntimeWarning)
135 loop.call_soon_threadsafe(loop.stop)
137 def interrupt(self) -> None:
138 if not CAN_USE_PYREPL:
139 return
141 from _pyrepl.simple_interact import _get_reader
142 r = _get_reader()
143 if r.threading_hook is not None:
144 r.threading_hook.add("") # type: ignore
147if __name__ == '__main__':
148 parser = argparse.ArgumentParser(
149 prog="python3 -m asyncio",
150 description="Interactive asyncio shell and CLI tools",
151 color=True,
152 )
153 subparsers = parser.add_subparsers(help="sub-commands", dest="command")
154 ps = subparsers.add_parser(
155 "ps", help="Display a table of all pending tasks in a process"
156 )
157 ps.add_argument("pid", type=int, help="Process ID to inspect")
158 formats = [fmt.value for fmt in TaskTableOutputFormat]
159 formats_to_show = [fmt for fmt in formats
160 if fmt != TaskTableOutputFormat.bsv.value]
161 ps.add_argument("--format", choices=formats, default="table",
162 metavar=f"{{{','.join(formats_to_show)}}}")
163 pstree = subparsers.add_parser(
164 "pstree", help="Display a tree of all pending tasks in a process"
165 )
166 pstree.add_argument("pid", type=int, help="Process ID to inspect")
167 args = parser.parse_args()
168 match args.command:
169 case "ps":
170 display_awaited_by_tasks_table(args.pid, format=args.format)
171 sys.exit(0)
172 case "pstree":
173 display_awaited_by_tasks_tree(args.pid)
174 sys.exit(0)
175 case None:
176 pass # continue to the interactive shell
177 case _:
178 # shouldn't happen as an invalid command-line wouldn't parse
179 # but let's keep it for the next person adding a command
180 print(f"error: unhandled command {args.command}", file=sys.stderr)
181 parser.print_usage(file=sys.stderr)
182 sys.exit(1)
184 sys.audit("cpython.run_stdin")
186 if os.getenv('PYTHON_BASIC_REPL'):
187 CAN_USE_PYREPL = False
188 else:
189 from _pyrepl.main import CAN_USE_PYREPL
191 return_code = 0
192 loop = asyncio.new_event_loop()
193 asyncio.set_event_loop(loop)
195 repl_locals = {'asyncio': asyncio}
196 for key in {'__name__', '__package__',
197 '__loader__', '__spec__',
198 '__builtins__', '__file__'}:
199 repl_locals[key] = locals()[key]
201 console = AsyncIOInteractiveConsole(repl_locals, loop)
203 repl_future = None
204 keyboard_interrupted = False
206 try:
207 import readline # NoQA
208 except ImportError:
209 readline = None
211 interactive_hook = getattr(sys, "__interactivehook__", None)
213 if interactive_hook is not None:
214 sys.audit("cpython.run_interactivehook", interactive_hook)
215 interactive_hook()
217 if interactive_hook is site.register_readline:
218 # Fix the completer function to use the interactive console locals
219 try:
220 import rlcompleter
221 except:
222 pass
223 else:
224 if readline is not None:
225 completer = rlcompleter.Completer(console.locals)
226 readline.set_completer(completer.complete)
228 repl_thread = REPLThread(name="Interactive thread")
229 repl_thread.daemon = True
230 repl_thread.start()
232 while True:
233 try:
234 loop.run_forever()
235 except KeyboardInterrupt:
236 keyboard_interrupted = True
237 if repl_future and not repl_future.done():
238 repl_future.cancel()
239 repl_thread.interrupt()
240 continue
241 else:
242 break
244 console.write('exiting asyncio REPL...\n')
245 sys.exit(return_code)