Coverage for Lib / asyncio / __main__.py: 0%

102 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-07 01:33 +0000

1import argparse 

2import ast 

3import asyncio 

4import asyncio.tools 

5import concurrent.futures 

6import contextvars 

7import inspect 

8import os 

9import site 

10import sys 

11import threading 

12import types 

13import warnings 

14 

15from _colorize import get_theme 

16from _pyrepl.console import InteractiveColoredConsole 

17 

18from . import futures 

19 

20 

21class AsyncIOInteractiveConsole(InteractiveColoredConsole): 

22 

23 def __init__(self, locals, loop): 

24 super().__init__(locals, filename="<stdin>") 

25 self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT 

26 

27 self.loop = loop 

28 self.context = contextvars.copy_context() 

29 

30 def runcode(self, code): 

31 global return_code 

32 future = concurrent.futures.Future() 

33 

34 def callback(): 

35 global return_code 

36 global repl_future 

37 global keyboard_interrupted 

38 

39 repl_future = None 

40 keyboard_interrupted = False 

41 

42 func = types.FunctionType(code, self.locals) 

43 try: 

44 coro = func() 

45 except SystemExit as se: 

46 return_code = se.code 

47 self.loop.stop() 

48 return 

49 except KeyboardInterrupt as ex: 

50 keyboard_interrupted = True 

51 future.set_exception(ex) 

52 return 

53 except BaseException as ex: 

54 future.set_exception(ex) 

55 return 

56 

57 if not inspect.iscoroutine(coro): 

58 future.set_result(coro) 

59 return 

60 

61 try: 

62 repl_future = self.loop.create_task(coro, context=self.context) 

63 futures._chain_future(repl_future, future) 

64 except BaseException as exc: 

65 future.set_exception(exc) 

66 

67 self.loop.call_soon_threadsafe(callback, context=self.context) 

68 

69 try: 

70 return future.result() 

71 except SystemExit as se: 

72 return_code = se.code 

73 self.loop.stop() 

74 return 

75 except BaseException: 

76 if keyboard_interrupted: 

77 if not CAN_USE_PYREPL: 

78 self.write("\nKeyboardInterrupt\n") 

79 else: 

80 self.showtraceback() 

81 return self.STATEMENT_FAILED 

82 

83class REPLThread(threading.Thread): 

84 

85 def run(self): 

86 global return_code 

87 

88 try: 

89 if not sys.flags.quiet: 

90 banner = ( 

91 f'asyncio REPL {sys.version} on {sys.platform}\n' 

92 f'Use "await" directly instead of "asyncio.run()".\n' 

93 f'Type "help", "copyright", "credits" or "license" ' 

94 f'for more information.\n' 

95 ) 

96 

97 console.write(banner) 

98 

99 if not sys.flags.isolated and (startup_path := os.getenv("PYTHONSTARTUP")): 

100 sys.audit("cpython.run_startup", startup_path) 

101 

102 import tokenize 

103 with tokenize.open(startup_path) as f: 

104 startup_code = compile(f.read(), startup_path, "exec") 

105 exec(startup_code, console.locals) 

106 

107 ps1 = getattr(sys, "ps1", ">>> ") 

108 if CAN_USE_PYREPL: 

109 theme = get_theme().syntax 

110 ps1 = f"{theme.prompt}{ps1}{theme.reset}" 

111 import_line = f'{theme.keyword}import{theme.reset} asyncio' 

112 else: 

113 import_line = "import asyncio" 

114 console.write(f"{ps1}{import_line}\n") 

115 

116 if CAN_USE_PYREPL: 

117 from _pyrepl.simple_interact import ( 

118 run_multiline_interactive_console, 

119 ) 

120 try: 

121 sys.ps1 = ps1 

122 run_multiline_interactive_console(console) 

123 except SystemExit: 

124 # expected via the `exit` and `quit` commands 

125 pass 

126 except BaseException: 

127 # unexpected issue 

128 console.showtraceback() 

129 console.write("Internal error, ") 

130 return_code = 1 

131 else: 

132 console.interact(banner="", exitmsg="") 

133 finally: 

134 warnings.filterwarnings( 

135 'ignore', 

136 message=r'^coroutine .* was never awaited$', 

137 category=RuntimeWarning) 

138 

139 loop.call_soon_threadsafe(loop.stop) 

140 

141 def interrupt(self) -> None: 

142 if not CAN_USE_PYREPL: 

143 return 

144 

145 from _pyrepl.simple_interact import _get_reader 

146 r = _get_reader() 

147 if r.threading_hook is not None: 

148 r.threading_hook.add("") # type: ignore 

149 

150 

151if __name__ == '__main__': 

152 parser = argparse.ArgumentParser( 

153 prog="python3 -m asyncio", 

154 description="Interactive asyncio shell and CLI tools", 

155 color=True, 

156 ) 

157 subparsers = parser.add_subparsers(help="sub-commands", dest="command") 

158 ps = subparsers.add_parser( 

159 "ps", help="Display a table of all pending tasks in a process" 

160 ) 

161 ps.add_argument("pid", type=int, help="Process ID to inspect") 

162 pstree = subparsers.add_parser( 

163 "pstree", help="Display a tree of all pending tasks in a process" 

164 ) 

165 pstree.add_argument("pid", type=int, help="Process ID to inspect") 

166 args = parser.parse_args() 

167 match args.command: 

168 case "ps": 

169 asyncio.tools.display_awaited_by_tasks_table(args.pid) 

170 sys.exit(0) 

171 case "pstree": 

172 asyncio.tools.display_awaited_by_tasks_tree(args.pid) 

173 sys.exit(0) 

174 case None: 

175 pass # continue to the interactive shell 

176 case _: 

177 # shouldn't happen as an invalid command-line wouldn't parse 

178 # but let's keep it for the next person adding a command 

179 print(f"error: unhandled command {args.command}", file=sys.stderr) 

180 parser.print_usage(file=sys.stderr) 

181 sys.exit(1) 

182 

183 sys.audit("cpython.run_stdin") 

184 

185 if os.getenv('PYTHON_BASIC_REPL'): 

186 CAN_USE_PYREPL = False 

187 else: 

188 from _pyrepl.main import CAN_USE_PYREPL 

189 

190 return_code = 0 

191 loop = asyncio.new_event_loop() 

192 asyncio.set_event_loop(loop) 

193 

194 repl_locals = {'asyncio': asyncio} 

195 for key in {'__name__', '__package__', 

196 '__loader__', '__spec__', 

197 '__builtins__', '__file__'}: 

198 repl_locals[key] = locals()[key] 

199 

200 console = AsyncIOInteractiveConsole(repl_locals, loop) 

201 

202 repl_future = None 

203 keyboard_interrupted = False 

204 

205 try: 

206 import readline # NoQA 

207 except ImportError: 

208 readline = None 

209 

210 interactive_hook = getattr(sys, "__interactivehook__", None) 

211 

212 if interactive_hook is not None: 

213 sys.audit("cpython.run_interactivehook", interactive_hook) 

214 interactive_hook() 

215 

216 if interactive_hook is site.register_readline: 

217 # Fix the completer function to use the interactive console locals 

218 try: 

219 import rlcompleter 

220 except: 

221 pass 

222 else: 

223 if readline is not None: 

224 completer = rlcompleter.Completer(console.locals) 

225 readline.set_completer(completer.complete) 

226 

227 repl_thread = REPLThread(name="Interactive thread") 

228 repl_thread.daemon = True 

229 repl_thread.start() 

230 

231 while True: 

232 try: 

233 loop.run_forever() 

234 except KeyboardInterrupt: 

235 keyboard_interrupted = True 

236 if repl_future and not repl_future.done(): 

237 repl_future.cancel() 

238 repl_thread.interrupt() 

239 continue 

240 else: 

241 break 

242 

243 console.write('exiting asyncio REPL...\n') 

244 loop.close() 

245 sys.exit(return_code)