Coverage for Lib / asyncio / __main__.py: 0%

105 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-07 02:15 +0000

1import argparse 

2import ast 

3import asyncio 

4import asyncio.tools 

5import concurrent.futures 

6import contextvars 

7import inspect 

8import os 

9import site 

10import sys 

11import threading 

12import types 

13import warnings 

14 

15try: 

16 from _colorize import get_theme 

17 from _pyrepl.console import InteractiveColoredConsole as InteractiveConsole 

18except ModuleNotFoundError: 

19 from code import InteractiveConsole 

20 

21from . import futures 

22 

23 

24class AsyncIOInteractiveConsole(InteractiveConsole): 

25 

26 def __init__(self, locals, loop): 

27 super().__init__(locals, filename="<stdin>") 

28 self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT 

29 

30 self.loop = loop 

31 self.context = contextvars.copy_context() 

32 

33 def runcode(self, code): 

34 global return_code 

35 future = concurrent.futures.Future() 

36 

37 def callback(): 

38 global return_code 

39 global repl_future 

40 global keyboard_interrupted 

41 

42 repl_future = None 

43 keyboard_interrupted = False 

44 

45 func = types.FunctionType(code, self.locals) 

46 try: 

47 coro = func() 

48 except SystemExit as se: 

49 return_code = se.code 

50 self.loop.stop() 

51 return 

52 except KeyboardInterrupt as ex: 

53 keyboard_interrupted = True 

54 future.set_exception(ex) 

55 return 

56 except BaseException as ex: 

57 future.set_exception(ex) 

58 return 

59 

60 if not inspect.iscoroutine(coro): 

61 future.set_result(coro) 

62 return 

63 

64 try: 

65 repl_future = self.loop.create_task(coro, context=self.context) 

66 futures._chain_future(repl_future, future) 

67 except BaseException as exc: 

68 future.set_exception(exc) 

69 

70 self.loop.call_soon_threadsafe(callback, context=self.context) 

71 

72 try: 

73 return future.result() 

74 except SystemExit as se: 

75 return_code = se.code 

76 self.loop.stop() 

77 return 

78 except BaseException: 

79 if keyboard_interrupted: 

80 if not CAN_USE_PYREPL: 

81 self.write("\nKeyboardInterrupt\n") 

82 else: 

83 self.showtraceback() 

84 return self.STATEMENT_FAILED 

85 

86class REPLThread(threading.Thread): 

87 

88 def run(self): 

89 global return_code 

90 

91 try: 

92 if not sys.flags.quiet: 

93 banner = ( 

94 f'asyncio REPL {sys.version} on {sys.platform}\n' 

95 f'Use "await" directly instead of "asyncio.run()".\n' 

96 f'Type "help", "copyright", "credits" or "license" ' 

97 f'for more information.\n' 

98 ) 

99 

100 console.write(banner) 

101 

102 if not sys.flags.isolated and (startup_path := os.getenv("PYTHONSTARTUP")): 

103 sys.audit("cpython.run_startup", startup_path) 

104 

105 import tokenize 

106 with tokenize.open(startup_path) as f: 

107 startup_code = compile(f.read(), startup_path, "exec") 

108 exec(startup_code, console.locals) 

109 

110 ps1 = getattr(sys, "ps1", ">>> ") 

111 if CAN_USE_PYREPL: 

112 theme = get_theme().syntax 

113 ps1 = f"{theme.prompt}{ps1}{theme.reset}" 

114 import_line = f'{theme.keyword}import{theme.reset} asyncio' 

115 else: 

116 import_line = "import asyncio" 

117 console.write(f"{ps1}{import_line}\n") 

118 

119 if CAN_USE_PYREPL: 

120 from _pyrepl.simple_interact import ( 

121 run_multiline_interactive_console, 

122 ) 

123 try: 

124 sys.ps1 = ps1 

125 run_multiline_interactive_console(console) 

126 except SystemExit: 

127 # expected via the `exit` and `quit` commands 

128 pass 

129 except BaseException: 

130 # unexpected issue 

131 console.showtraceback() 

132 console.write("Internal error, ") 

133 return_code = 1 

134 else: 

135 console.interact(banner="", exitmsg="") 

136 finally: 

137 warnings.filterwarnings( 

138 'ignore', 

139 message=r'^coroutine .* was never awaited$', 

140 category=RuntimeWarning) 

141 

142 loop.call_soon_threadsafe(loop.stop) 

143 

144 def interrupt(self) -> None: 

145 if not CAN_USE_PYREPL: 

146 return 

147 

148 from _pyrepl.simple_interact import _get_reader 

149 r = _get_reader() 

150 if r.threading_hook is not None: 

151 r.threading_hook.add("") # type: ignore 

152 

153 

154if __name__ == '__main__': 

155 parser = argparse.ArgumentParser( 

156 prog="python3 -m asyncio", 

157 description="Interactive asyncio shell and CLI tools", 

158 color=True, 

159 ) 

160 subparsers = parser.add_subparsers(help="sub-commands", dest="command") 

161 ps = subparsers.add_parser( 

162 "ps", help="Display a table of all pending tasks in a process" 

163 ) 

164 ps.add_argument("pid", type=int, help="Process ID to inspect") 

165 pstree = subparsers.add_parser( 

166 "pstree", help="Display a tree of all pending tasks in a process" 

167 ) 

168 pstree.add_argument("pid", type=int, help="Process ID to inspect") 

169 args = parser.parse_args() 

170 match args.command: 

171 case "ps": 

172 asyncio.tools.display_awaited_by_tasks_table(args.pid) 

173 sys.exit(0) 

174 case "pstree": 

175 asyncio.tools.display_awaited_by_tasks_tree(args.pid) 

176 sys.exit(0) 

177 case None: 

178 pass # continue to the interactive shell 

179 case _: 

180 # shouldn't happen as an invalid command-line wouldn't parse 

181 # but let's keep it for the next person adding a command 

182 print(f"error: unhandled command {args.command}", file=sys.stderr) 

183 parser.print_usage(file=sys.stderr) 

184 sys.exit(1) 

185 

186 sys.audit("cpython.run_stdin") 

187 

188 if os.getenv('PYTHON_BASIC_REPL'): 

189 CAN_USE_PYREPL = False 

190 else: 

191 try: 

192 from _pyrepl.main import CAN_USE_PYREPL 

193 except ModuleNotFoundError: 

194 CAN_USE_PYREPL = False 

195 

196 return_code = 0 

197 loop = asyncio.new_event_loop() 

198 asyncio.set_event_loop(loop) 

199 

200 repl_locals = {'asyncio': asyncio} 

201 for key in {'__name__', '__package__', 

202 '__loader__', '__spec__', 

203 '__builtins__', '__file__'}: 

204 repl_locals[key] = locals()[key] 

205 

206 console = AsyncIOInteractiveConsole(repl_locals, loop) 

207 

208 repl_future = None 

209 keyboard_interrupted = False 

210 

211 try: 

212 import readline # NoQA 

213 except ImportError: 

214 readline = None 

215 

216 interactive_hook = getattr(sys, "__interactivehook__", None) 

217 

218 if interactive_hook is not None: 

219 sys.audit("cpython.run_interactivehook", interactive_hook) 

220 interactive_hook() 

221 

222 if interactive_hook is site.register_readline: 

223 # Fix the completer function to use the interactive console locals 

224 try: 

225 import rlcompleter 

226 except: 

227 pass 

228 else: 

229 if readline is not None: 

230 completer = rlcompleter.Completer(console.locals) 

231 readline.set_completer(completer.complete) 

232 

233 repl_thread = REPLThread(name="Interactive thread") 

234 repl_thread.daemon = True 

235 repl_thread.start() 

236 

237 while True: 

238 try: 

239 loop.run_forever() 

240 except KeyboardInterrupt: 

241 keyboard_interrupted = True 

242 if repl_future and not repl_future.done(): 

243 repl_future.cancel() 

244 repl_thread.interrupt() 

245 continue 

246 else: 

247 break 

248 

249 console.write('exiting asyncio REPL...\n') 

250 loop.close() 

251 sys.exit(return_code)