Coverage for Lib/asyncio/__main__.py: 0%
94 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
1import ast
2import asyncio
3import concurrent.futures
4import contextvars
5import inspect
6import os
7import site
8import sys
9import threading
10import types
11import warnings
13from _colorize import can_colorize, ANSIColors # type: ignore[import-not-found]
14from _pyrepl.console import InteractiveColoredConsole
16from . import futures
19class AsyncIOInteractiveConsole(InteractiveColoredConsole):
21 def __init__(self, locals, loop):
22 super().__init__(locals, filename="<stdin>")
23 self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT
25 self.loop = loop
26 self.context = contextvars.copy_context()
28 def runcode(self, code):
29 global return_code
30 future = concurrent.futures.Future()
32 def callback():
33 global return_code
34 global repl_future
35 global keyboard_interrupted
37 repl_future = None
38 keyboard_interrupted = False
40 func = types.FunctionType(code, self.locals)
41 try:
42 coro = func()
43 except SystemExit as se:
44 return_code = se.code
45 self.loop.stop()
46 return
47 except KeyboardInterrupt as ex:
48 keyboard_interrupted = True
49 future.set_exception(ex)
50 return
51 except BaseException as ex:
52 future.set_exception(ex)
53 return
55 if not inspect.iscoroutine(coro):
56 future.set_result(coro)
57 return
59 try:
60 repl_future = self.loop.create_task(coro, context=self.context)
61 futures._chain_future(repl_future, future)
62 except BaseException as exc:
63 future.set_exception(exc)
65 loop.call_soon_threadsafe(callback, context=self.context)
67 try:
68 return future.result()
69 except SystemExit as se:
70 return_code = se.code
71 self.loop.stop()
72 return
73 except BaseException:
74 if keyboard_interrupted:
75 self.write("\nKeyboardInterrupt\n")
76 else:
77 self.showtraceback()
78 return self.STATEMENT_FAILED
80class REPLThread(threading.Thread):
82 def run(self):
83 global return_code
85 try:
86 banner = (
87 f'asyncio REPL {sys.version} on {sys.platform}\n'
88 f'Use "await" directly instead of "asyncio.run()".\n'
89 f'Type "help", "copyright", "credits" or "license" '
90 f'for more information.\n'
91 )
93 console.write(banner)
95 if startup_path := os.getenv("PYTHONSTARTUP"):
96 sys.audit("cpython.run_startup", startup_path)
98 import tokenize
99 with tokenize.open(startup_path) as f:
100 startup_code = compile(f.read(), startup_path, "exec")
101 exec(startup_code, console.locals)
103 ps1 = getattr(sys, "ps1", ">>> ")
104 if can_colorize() and CAN_USE_PYREPL:
105 ps1 = f"{ANSIColors.BOLD_MAGENTA}{ps1}{ANSIColors.RESET}"
106 console.write(f"{ps1}import asyncio\n")
108 if CAN_USE_PYREPL:
109 from _pyrepl.simple_interact import (
110 run_multiline_interactive_console,
111 )
112 try:
113 run_multiline_interactive_console(console)
114 except SystemExit:
115 # expected via the `exit` and `quit` commands
116 pass
117 except BaseException:
118 # unexpected issue
119 console.showtraceback()
120 console.write("Internal error, ")
121 return_code = 1
122 else:
123 console.interact(banner="", exitmsg="")
124 finally:
125 warnings.filterwarnings(
126 'ignore',
127 message=r'^coroutine .* was never awaited$',
128 category=RuntimeWarning)
130 loop.call_soon_threadsafe(loop.stop)
132 def interrupt(self) -> None:
133 if not CAN_USE_PYREPL:
134 return
136 from _pyrepl.simple_interact import _get_reader
137 r = _get_reader()
138 if r.threading_hook is not None:
139 r.threading_hook.add("") # type: ignore
142if __name__ == '__main__':
143 sys.audit("cpython.run_stdin")
145 if os.getenv('PYTHON_BASIC_REPL'):
146 CAN_USE_PYREPL = False
147 else:
148 from _pyrepl.main import CAN_USE_PYREPL
150 return_code = 0
151 loop = asyncio.new_event_loop()
152 asyncio.set_event_loop(loop)
154 repl_locals = {'asyncio': asyncio}
155 for key in {'__name__', '__package__',
156 '__loader__', '__spec__',
157 '__builtins__', '__file__'}:
158 repl_locals[key] = locals()[key]
160 console = AsyncIOInteractiveConsole(repl_locals, loop)
162 repl_future = None
163 keyboard_interrupted = False
165 try:
166 import readline # NoQA
167 except ImportError:
168 readline = None
170 interactive_hook = getattr(sys, "__interactivehook__", None)
172 if interactive_hook is not None:
173 sys.audit("cpython.run_interactivehook", interactive_hook)
174 interactive_hook()
176 if interactive_hook is site.register_readline:
177 # Fix the completer function to use the interactive console locals
178 try:
179 import rlcompleter
180 except:
181 pass
182 else:
183 if readline is not None:
184 completer = rlcompleter.Completer(console.locals)
185 readline.set_completer(completer.complete)
187 repl_thread = REPLThread(name="Interactive thread")
188 repl_thread.daemon = True
189 repl_thread.start()
191 while True:
192 try:
193 loop.run_forever()
194 except KeyboardInterrupt:
195 keyboard_interrupted = True
196 if repl_future and not repl_future.done():
197 repl_future.cancel()
198 repl_thread.interrupt()
199 continue
200 else:
201 break
203 console.write('exiting asyncio REPL...\n')
204 sys.exit(return_code)