Coverage for Lib / asyncio / __main__.py: 0%
105 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-07 02:15 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-07 02:15 +0000
1import argparse
2import ast
3import asyncio
4import asyncio.tools
5import concurrent.futures
6import contextvars
7import inspect
8import os
9import site
10import sys
11import threading
12import types
13import warnings
15try:
16 from _colorize import get_theme
17 from _pyrepl.console import InteractiveColoredConsole as InteractiveConsole
18except ModuleNotFoundError:
19 from code import InteractiveConsole
21from . import futures
24class AsyncIOInteractiveConsole(InteractiveConsole):
26 def __init__(self, locals, loop):
27 super().__init__(locals, filename="<stdin>")
28 self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT
30 self.loop = loop
31 self.context = contextvars.copy_context()
33 def runcode(self, code):
34 global return_code
35 future = concurrent.futures.Future()
37 def callback():
38 global return_code
39 global repl_future
40 global keyboard_interrupted
42 repl_future = None
43 keyboard_interrupted = False
45 func = types.FunctionType(code, self.locals)
46 try:
47 coro = func()
48 except SystemExit as se:
49 return_code = se.code
50 self.loop.stop()
51 return
52 except KeyboardInterrupt as ex:
53 keyboard_interrupted = True
54 future.set_exception(ex)
55 return
56 except BaseException as ex:
57 future.set_exception(ex)
58 return
60 if not inspect.iscoroutine(coro):
61 future.set_result(coro)
62 return
64 try:
65 repl_future = self.loop.create_task(coro, context=self.context)
66 futures._chain_future(repl_future, future)
67 except BaseException as exc:
68 future.set_exception(exc)
70 self.loop.call_soon_threadsafe(callback, context=self.context)
72 try:
73 return future.result()
74 except SystemExit as se:
75 return_code = se.code
76 self.loop.stop()
77 return
78 except BaseException:
79 if keyboard_interrupted:
80 if not CAN_USE_PYREPL:
81 self.write("\nKeyboardInterrupt\n")
82 else:
83 self.showtraceback()
84 return self.STATEMENT_FAILED
86class REPLThread(threading.Thread):
88 def run(self):
89 global return_code
91 try:
92 if not sys.flags.quiet:
93 banner = (
94 f'asyncio REPL {sys.version} on {sys.platform}\n'
95 f'Use "await" directly instead of "asyncio.run()".\n'
96 f'Type "help", "copyright", "credits" or "license" '
97 f'for more information.\n'
98 )
100 console.write(banner)
102 if not sys.flags.isolated and (startup_path := os.getenv("PYTHONSTARTUP")):
103 sys.audit("cpython.run_startup", startup_path)
105 import tokenize
106 with tokenize.open(startup_path) as f:
107 startup_code = compile(f.read(), startup_path, "exec")
108 exec(startup_code, console.locals)
110 ps1 = getattr(sys, "ps1", ">>> ")
111 if CAN_USE_PYREPL:
112 theme = get_theme().syntax
113 ps1 = f"{theme.prompt}{ps1}{theme.reset}"
114 import_line = f'{theme.keyword}import{theme.reset} asyncio'
115 else:
116 import_line = "import asyncio"
117 console.write(f"{ps1}{import_line}\n")
119 if CAN_USE_PYREPL:
120 from _pyrepl.simple_interact import (
121 run_multiline_interactive_console,
122 )
123 try:
124 sys.ps1 = ps1
125 run_multiline_interactive_console(console)
126 except SystemExit:
127 # expected via the `exit` and `quit` commands
128 pass
129 except BaseException:
130 # unexpected issue
131 console.showtraceback()
132 console.write("Internal error, ")
133 return_code = 1
134 else:
135 console.interact(banner="", exitmsg="")
136 finally:
137 warnings.filterwarnings(
138 'ignore',
139 message=r'^coroutine .* was never awaited$',
140 category=RuntimeWarning)
142 loop.call_soon_threadsafe(loop.stop)
144 def interrupt(self) -> None:
145 if not CAN_USE_PYREPL:
146 return
148 from _pyrepl.simple_interact import _get_reader
149 r = _get_reader()
150 if r.threading_hook is not None:
151 r.threading_hook.add("") # type: ignore
154if __name__ == '__main__':
155 parser = argparse.ArgumentParser(
156 prog="python3 -m asyncio",
157 description="Interactive asyncio shell and CLI tools",
158 color=True,
159 )
160 subparsers = parser.add_subparsers(help="sub-commands", dest="command")
161 ps = subparsers.add_parser(
162 "ps", help="Display a table of all pending tasks in a process"
163 )
164 ps.add_argument("pid", type=int, help="Process ID to inspect")
165 pstree = subparsers.add_parser(
166 "pstree", help="Display a tree of all pending tasks in a process"
167 )
168 pstree.add_argument("pid", type=int, help="Process ID to inspect")
169 args = parser.parse_args()
170 match args.command:
171 case "ps":
172 asyncio.tools.display_awaited_by_tasks_table(args.pid)
173 sys.exit(0)
174 case "pstree":
175 asyncio.tools.display_awaited_by_tasks_tree(args.pid)
176 sys.exit(0)
177 case None:
178 pass # continue to the interactive shell
179 case _:
180 # shouldn't happen as an invalid command-line wouldn't parse
181 # but let's keep it for the next person adding a command
182 print(f"error: unhandled command {args.command}", file=sys.stderr)
183 parser.print_usage(file=sys.stderr)
184 sys.exit(1)
186 sys.audit("cpython.run_stdin")
188 if os.getenv('PYTHON_BASIC_REPL'):
189 CAN_USE_PYREPL = False
190 else:
191 try:
192 from _pyrepl.main import CAN_USE_PYREPL
193 except ModuleNotFoundError:
194 CAN_USE_PYREPL = False
196 return_code = 0
197 loop = asyncio.new_event_loop()
198 asyncio.set_event_loop(loop)
200 repl_locals = {'asyncio': asyncio}
201 for key in {'__name__', '__package__',
202 '__loader__', '__spec__',
203 '__builtins__', '__file__'}:
204 repl_locals[key] = locals()[key]
206 console = AsyncIOInteractiveConsole(repl_locals, loop)
208 repl_future = None
209 keyboard_interrupted = False
211 try:
212 import readline # NoQA
213 except ImportError:
214 readline = None
216 interactive_hook = getattr(sys, "__interactivehook__", None)
218 if interactive_hook is not None:
219 sys.audit("cpython.run_interactivehook", interactive_hook)
220 interactive_hook()
222 if interactive_hook is site.register_readline:
223 # Fix the completer function to use the interactive console locals
224 try:
225 import rlcompleter
226 except:
227 pass
228 else:
229 if readline is not None:
230 completer = rlcompleter.Completer(console.locals)
231 readline.set_completer(completer.complete)
233 repl_thread = REPLThread(name="Interactive thread")
234 repl_thread.daemon = True
235 repl_thread.start()
237 while True:
238 try:
239 loop.run_forever()
240 except KeyboardInterrupt:
241 keyboard_interrupted = True
242 if repl_future and not repl_future.done():
243 repl_future.cancel()
244 repl_thread.interrupt()
245 continue
246 else:
247 break
249 console.write('exiting asyncio REPL...\n')
250 loop.close()
251 sys.exit(return_code)