Coverage for Lib/asyncio/__main__.py: 0%

110 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-06 03:00 +0000

1import argparse 

2import ast 

3import asyncio 

4import asyncio.tools 

5import concurrent.futures 

6import contextvars 

7import inspect 

8import os 

9import site 

10import sys 

11import threading 

12import types 

13import warnings 

14 

15try: 

16 from _colorize import get_theme 

17 from _pyrepl.console import InteractiveColoredConsole as InteractiveConsole 

18except ModuleNotFoundError: 

19 from code import InteractiveConsole 

20 

21from . import futures 

22 

23 

24class AsyncIOInteractiveConsole(InteractiveConsole): 

25 

26 def __init__(self, locals, loop): 

27 super().__init__(locals, filename="<stdin>") 

28 self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT 

29 

30 self.loop = loop 

31 self.context = contextvars.copy_context() 

32 

33 def runcode(self, code): 

34 global return_code 

35 future = concurrent.futures.Future() 

36 

37 def callback(): 

38 global return_code 

39 global repl_future 

40 global keyboard_interrupted 

41 

42 repl_future = None 

43 keyboard_interrupted = False 

44 

45 func = types.FunctionType(code, self.locals) 

46 try: 

47 coro = func() 

48 except SystemExit as se: 

49 return_code = se.code 

50 self.loop.stop() 

51 return 

52 except KeyboardInterrupt as ex: 

53 keyboard_interrupted = True 

54 future.set_exception(ex) 

55 return 

56 except BaseException as ex: 

57 future.set_exception(ex) 

58 return 

59 

60 if not inspect.iscoroutine(coro): 

61 future.set_result(coro) 

62 return 

63 

64 try: 

65 repl_future = self.loop.create_task(coro, context=self.context) 

66 futures._chain_future(repl_future, future) 

67 except BaseException as exc: 

68 future.set_exception(exc) 

69 

70 self.loop.call_soon_threadsafe(callback, context=self.context) 

71 

72 try: 

73 return future.result() 

74 except SystemExit as se: 

75 return_code = se.code 

76 self.loop.stop() 

77 return 

78 except BaseException: 

79 if keyboard_interrupted: 

80 if not CAN_USE_PYREPL: 

81 self.write("\nKeyboardInterrupt\n") 

82 else: 

83 self.showtraceback() 

84 return self.STATEMENT_FAILED 

85 

86class REPLThread(threading.Thread): 

87 

88 def run(self): 

89 global return_code 

90 

91 try: 

92 if not sys.flags.quiet: 

93 banner = ( 

94 f'asyncio REPL {sys.version} on {sys.platform}\n' 

95 f'Use "await" directly instead of "asyncio.run()".\n' 

96 f'Type "help", "copyright", "credits" or "license" ' 

97 f'for more information.\n' 

98 ) 

99 

100 console.write(banner) 

101 

102 if not sys.flags.isolated and (startup_path := os.getenv("PYTHONSTARTUP")): 

103 sys.audit("cpython.run_startup", startup_path) 

104 try: 

105 import tokenize 

106 with tokenize.open(startup_path) as f: 

107 startup_code = compile(f.read(), startup_path, "exec") 

108 exec(startup_code, console.locals) 

109 except SystemExit: 

110 raise 

111 except BaseException: 

112 console.showtraceback() 

113 

114 ps1 = getattr(sys, "ps1", ">>> ") 

115 if CAN_USE_PYREPL: 

116 theme = get_theme().syntax 

117 ps1 = f"{theme.prompt}{ps1}{theme.reset}" 

118 import_line = f'{theme.keyword}import{theme.reset} asyncio' 

119 else: 

120 import_line = "import asyncio" 

121 console.write(f"{ps1}{import_line}\n") 

122 

123 if CAN_USE_PYREPL: 

124 from _pyrepl.simple_interact import ( 

125 run_multiline_interactive_console, 

126 ) 

127 try: 

128 sys.ps1 = ps1 

129 run_multiline_interactive_console(console) 

130 except SystemExit: 

131 # expected via the `exit` and `quit` commands 

132 pass 

133 except BaseException: 

134 # unexpected issue 

135 console.showtraceback() 

136 console.write("Internal error, ") 

137 return_code = 1 

138 else: 

139 console.interact(banner="", exitmsg="") 

140 finally: 

141 warnings.filterwarnings( 

142 'ignore', 

143 message=r'^coroutine .* was never awaited$', 

144 category=RuntimeWarning) 

145 

146 loop.call_soon_threadsafe(loop.stop) 

147 

148 def interrupt(self) -> None: 

149 if not CAN_USE_PYREPL: 

150 return 

151 

152 from _pyrepl.simple_interact import _get_reader 

153 r = _get_reader() 

154 if r.threading_hook is not None: 

155 r.threading_hook.add("") # type: ignore 

156 

157 

158if __name__ == '__main__': 

159 parser = argparse.ArgumentParser( 

160 prog="python3 -m asyncio", 

161 description="Interactive asyncio shell and CLI tools", 

162 ) 

163 subparsers = parser.add_subparsers(help="sub-commands", dest="command") 

164 ps = subparsers.add_parser( 

165 "ps", help="Display a table of all pending tasks in a process" 

166 ) 

167 ps.add_argument("pid", type=int, help="Process ID to inspect") 

168 ps.add_argument( 

169 "--retries", 

170 type=int, 

171 default=3, 

172 help="Number of retries on transient attach errors", 

173 ) 

174 pstree = subparsers.add_parser( 

175 "pstree", help="Display a tree of all pending tasks in a process" 

176 ) 

177 pstree.add_argument("pid", type=int, help="Process ID to inspect") 

178 pstree.add_argument( 

179 "--retries", 

180 type=int, 

181 default=3, 

182 help="Number of retries on transient attach errors", 

183 ) 

184 args = parser.parse_args() 

185 match args.command: 

186 case "ps": 

187 asyncio.tools.display_awaited_by_tasks_table(args.pid, retries=args.retries) 

188 sys.exit(0) 

189 case "pstree": 

190 asyncio.tools.display_awaited_by_tasks_tree(args.pid, retries=args.retries) 

191 sys.exit(0) 

192 case None: 

193 pass # continue to the interactive shell 

194 case _: 

195 # shouldn't happen as an invalid command-line wouldn't parse 

196 # but let's keep it for the next person adding a command 

197 print(f"error: unhandled command {args.command}", file=sys.stderr) 

198 parser.print_usage(file=sys.stderr) 

199 sys.exit(1) 

200 

201 sys.audit("cpython.run_stdin") 

202 

203 if os.getenv('PYTHON_BASIC_REPL'): 

204 CAN_USE_PYREPL = False 

205 else: 

206 try: 

207 from _pyrepl.main import CAN_USE_PYREPL 

208 except ModuleNotFoundError: 

209 CAN_USE_PYREPL = False 

210 

211 return_code = 0 

212 loop = asyncio.new_event_loop() 

213 asyncio.set_event_loop(loop) 

214 

215 repl_locals = {'asyncio': asyncio} 

216 for key in {'__name__', '__package__', 

217 '__loader__', '__spec__', 

218 '__builtins__', '__file__'}: 

219 repl_locals[key] = locals()[key] 

220 

221 console = AsyncIOInteractiveConsole(repl_locals, loop) 

222 

223 repl_future = None 

224 keyboard_interrupted = False 

225 

226 try: 

227 import readline # NoQA 

228 except ImportError: 

229 readline = None 

230 

231 interactive_hook = getattr(sys, "__interactivehook__", None) 

232 

233 if interactive_hook is not None: 

234 sys.audit("cpython.run_interactivehook", interactive_hook) 

235 interactive_hook() 

236 

237 if interactive_hook is site.register_readline: 

238 # Fix the completer function to use the interactive console locals 

239 try: 

240 import rlcompleter 

241 except: 

242 pass 

243 else: 

244 if readline is not None: 

245 completer = rlcompleter.Completer(console.locals) 

246 readline.set_completer(completer.complete) 

247 

248 repl_thread = REPLThread(name="Interactive thread") 

249 repl_thread.daemon = True 

250 repl_thread.start() 

251 

252 while True: 

253 try: 

254 loop.run_forever() 

255 except KeyboardInterrupt: 

256 keyboard_interrupted = True 

257 if repl_future and not repl_future.done(): 

258 repl_future.cancel() 

259 repl_thread.interrupt() 

260 continue 

261 else: 

262 break 

263 

264 console.write('exiting asyncio REPL...\n') 

265 loop.close() 

266 sys.exit(return_code)