Coverage for Lib / asyncio / __main__.py: 0%
102 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-07 01:33 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-07 01:33 +0000
1import argparse
2import ast
3import asyncio
4import asyncio.tools
5import concurrent.futures
6import contextvars
7import inspect
8import os
9import site
10import sys
11import threading
12import types
13import warnings
15from _colorize import get_theme
16from _pyrepl.console import InteractiveColoredConsole
18from . import futures
21class AsyncIOInteractiveConsole(InteractiveColoredConsole):
23 def __init__(self, locals, loop):
24 super().__init__(locals, filename="<stdin>")
25 self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT
27 self.loop = loop
28 self.context = contextvars.copy_context()
30 def runcode(self, code):
31 global return_code
32 future = concurrent.futures.Future()
34 def callback():
35 global return_code
36 global repl_future
37 global keyboard_interrupted
39 repl_future = None
40 keyboard_interrupted = False
42 func = types.FunctionType(code, self.locals)
43 try:
44 coro = func()
45 except SystemExit as se:
46 return_code = se.code
47 self.loop.stop()
48 return
49 except KeyboardInterrupt as ex:
50 keyboard_interrupted = True
51 future.set_exception(ex)
52 return
53 except BaseException as ex:
54 future.set_exception(ex)
55 return
57 if not inspect.iscoroutine(coro):
58 future.set_result(coro)
59 return
61 try:
62 repl_future = self.loop.create_task(coro, context=self.context)
63 futures._chain_future(repl_future, future)
64 except BaseException as exc:
65 future.set_exception(exc)
67 self.loop.call_soon_threadsafe(callback, context=self.context)
69 try:
70 return future.result()
71 except SystemExit as se:
72 return_code = se.code
73 self.loop.stop()
74 return
75 except BaseException:
76 if keyboard_interrupted:
77 if not CAN_USE_PYREPL:
78 self.write("\nKeyboardInterrupt\n")
79 else:
80 self.showtraceback()
81 return self.STATEMENT_FAILED
83class REPLThread(threading.Thread):
85 def run(self):
86 global return_code
88 try:
89 if not sys.flags.quiet:
90 banner = (
91 f'asyncio REPL {sys.version} on {sys.platform}\n'
92 f'Use "await" directly instead of "asyncio.run()".\n'
93 f'Type "help", "copyright", "credits" or "license" '
94 f'for more information.\n'
95 )
97 console.write(banner)
99 if not sys.flags.isolated and (startup_path := os.getenv("PYTHONSTARTUP")):
100 sys.audit("cpython.run_startup", startup_path)
102 import tokenize
103 with tokenize.open(startup_path) as f:
104 startup_code = compile(f.read(), startup_path, "exec")
105 exec(startup_code, console.locals)
107 ps1 = getattr(sys, "ps1", ">>> ")
108 if CAN_USE_PYREPL:
109 theme = get_theme().syntax
110 ps1 = f"{theme.prompt}{ps1}{theme.reset}"
111 import_line = f'{theme.keyword}import{theme.reset} asyncio'
112 else:
113 import_line = "import asyncio"
114 console.write(f"{ps1}{import_line}\n")
116 if CAN_USE_PYREPL:
117 from _pyrepl.simple_interact import (
118 run_multiline_interactive_console,
119 )
120 try:
121 sys.ps1 = ps1
122 run_multiline_interactive_console(console)
123 except SystemExit:
124 # expected via the `exit` and `quit` commands
125 pass
126 except BaseException:
127 # unexpected issue
128 console.showtraceback()
129 console.write("Internal error, ")
130 return_code = 1
131 else:
132 console.interact(banner="", exitmsg="")
133 finally:
134 warnings.filterwarnings(
135 'ignore',
136 message=r'^coroutine .* was never awaited$',
137 category=RuntimeWarning)
139 loop.call_soon_threadsafe(loop.stop)
141 def interrupt(self) -> None:
142 if not CAN_USE_PYREPL:
143 return
145 from _pyrepl.simple_interact import _get_reader
146 r = _get_reader()
147 if r.threading_hook is not None:
148 r.threading_hook.add("") # type: ignore
151if __name__ == '__main__':
152 parser = argparse.ArgumentParser(
153 prog="python3 -m asyncio",
154 description="Interactive asyncio shell and CLI tools",
155 color=True,
156 )
157 subparsers = parser.add_subparsers(help="sub-commands", dest="command")
158 ps = subparsers.add_parser(
159 "ps", help="Display a table of all pending tasks in a process"
160 )
161 ps.add_argument("pid", type=int, help="Process ID to inspect")
162 pstree = subparsers.add_parser(
163 "pstree", help="Display a tree of all pending tasks in a process"
164 )
165 pstree.add_argument("pid", type=int, help="Process ID to inspect")
166 args = parser.parse_args()
167 match args.command:
168 case "ps":
169 asyncio.tools.display_awaited_by_tasks_table(args.pid)
170 sys.exit(0)
171 case "pstree":
172 asyncio.tools.display_awaited_by_tasks_tree(args.pid)
173 sys.exit(0)
174 case None:
175 pass # continue to the interactive shell
176 case _:
177 # shouldn't happen as an invalid command-line wouldn't parse
178 # but let's keep it for the next person adding a command
179 print(f"error: unhandled command {args.command}", file=sys.stderr)
180 parser.print_usage(file=sys.stderr)
181 sys.exit(1)
183 sys.audit("cpython.run_stdin")
185 if os.getenv('PYTHON_BASIC_REPL'):
186 CAN_USE_PYREPL = False
187 else:
188 from _pyrepl.main import CAN_USE_PYREPL
190 return_code = 0
191 loop = asyncio.new_event_loop()
192 asyncio.set_event_loop(loop)
194 repl_locals = {'asyncio': asyncio}
195 for key in {'__name__', '__package__',
196 '__loader__', '__spec__',
197 '__builtins__', '__file__'}:
198 repl_locals[key] = locals()[key]
200 console = AsyncIOInteractiveConsole(repl_locals, loop)
202 repl_future = None
203 keyboard_interrupted = False
205 try:
206 import readline # NoQA
207 except ImportError:
208 readline = None
210 interactive_hook = getattr(sys, "__interactivehook__", None)
212 if interactive_hook is not None:
213 sys.audit("cpython.run_interactivehook", interactive_hook)
214 interactive_hook()
216 if interactive_hook is site.register_readline:
217 # Fix the completer function to use the interactive console locals
218 try:
219 import rlcompleter
220 except:
221 pass
222 else:
223 if readline is not None:
224 completer = rlcompleter.Completer(console.locals)
225 readline.set_completer(completer.complete)
227 repl_thread = REPLThread(name="Interactive thread")
228 repl_thread.daemon = True
229 repl_thread.start()
231 while True:
232 try:
233 loop.run_forever()
234 except KeyboardInterrupt:
235 keyboard_interrupted = True
236 if repl_future and not repl_future.done():
237 repl_future.cancel()
238 repl_thread.interrupt()
239 continue
240 else:
241 break
243 console.write('exiting asyncio REPL...\n')
244 loop.close()
245 sys.exit(return_code)