Coverage for Lib/asyncio/__main__.py: 0%

97 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-23 01:21 +0000

1import argparse 

2import ast 

3import asyncio 

4import concurrent.futures 

5import contextvars 

6import inspect 

7import os 

8import site 

9import sys 

10import threading 

11import types 

12import warnings 

13from asyncio.tools import (TaskTableOutputFormat, 

14 display_awaited_by_tasks_table, 

15 display_awaited_by_tasks_tree) 

16 

17from _colorize import get_theme 

18from _pyrepl.console import InteractiveColoredConsole 

19 

20from . import futures 

21 

22 

23class AsyncIOInteractiveConsole(InteractiveColoredConsole): 

24 

25 def __init__(self, locals, loop): 

26 super().__init__(locals, filename="<stdin>") 

27 self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT 

28 

29 self.loop = loop 

30 self.context = contextvars.copy_context() 

31 

32 def runcode(self, code): 

33 global return_code 

34 future = concurrent.futures.Future() 

35 

36 def callback(): 

37 global return_code 

38 global repl_future 

39 global keyboard_interrupted 

40 

41 repl_future = None 

42 keyboard_interrupted = False 

43 

44 func = types.FunctionType(code, self.locals) 

45 try: 

46 coro = func() 

47 except SystemExit as se: 

48 return_code = se.code 

49 self.loop.stop() 

50 return 

51 except KeyboardInterrupt as ex: 

52 keyboard_interrupted = True 

53 future.set_exception(ex) 

54 return 

55 except BaseException as ex: 

56 future.set_exception(ex) 

57 return 

58 

59 if not inspect.iscoroutine(coro): 

60 future.set_result(coro) 

61 return 

62 

63 try: 

64 repl_future = self.loop.create_task(coro, context=self.context) 

65 futures._chain_future(repl_future, future) 

66 except BaseException as exc: 

67 future.set_exception(exc) 

68 

69 self.loop.call_soon_threadsafe(callback, context=self.context) 

70 

71 try: 

72 return future.result() 

73 except SystemExit as se: 

74 return_code = se.code 

75 self.loop.stop() 

76 return 

77 except BaseException: 

78 if keyboard_interrupted: 

79 self.write("\nKeyboardInterrupt\n") 

80 else: 

81 self.showtraceback() 

82 return self.STATEMENT_FAILED 

83 

84class REPLThread(threading.Thread): 

85 

86 def run(self): 

87 global return_code 

88 

89 try: 

90 banner = ( 

91 f'asyncio REPL {sys.version} on {sys.platform}\n' 

92 f'Use "await" directly instead of "asyncio.run()".\n' 

93 f'Type "help", "copyright", "credits" or "license" ' 

94 f'for more information.\n' 

95 ) 

96 

97 console.write(banner) 

98 

99 if startup_path := os.getenv("PYTHONSTARTUP"): 

100 sys.audit("cpython.run_startup", startup_path) 

101 

102 import tokenize 

103 with tokenize.open(startup_path) as f: 

104 startup_code = compile(f.read(), startup_path, "exec") 

105 exec(startup_code, console.locals) 

106 

107 ps1 = getattr(sys, "ps1", ">>> ") 

108 if CAN_USE_PYREPL: 

109 theme = get_theme().syntax 

110 ps1 = f"{theme.prompt}{ps1}{theme.reset}" 

111 console.write(f"{ps1}import asyncio\n") 

112 

113 if CAN_USE_PYREPL: 

114 from _pyrepl.simple_interact import ( 

115 run_multiline_interactive_console, 

116 ) 

117 try: 

118 run_multiline_interactive_console(console) 

119 except SystemExit: 

120 # expected via the `exit` and `quit` commands 

121 pass 

122 except BaseException: 

123 # unexpected issue 

124 console.showtraceback() 

125 console.write("Internal error, ") 

126 return_code = 1 

127 else: 

128 console.interact(banner="", exitmsg="") 

129 finally: 

130 warnings.filterwarnings( 

131 'ignore', 

132 message=r'^coroutine .* was never awaited$', 

133 category=RuntimeWarning) 

134 

135 loop.call_soon_threadsafe(loop.stop) 

136 

137 def interrupt(self) -> None: 

138 if not CAN_USE_PYREPL: 

139 return 

140 

141 from _pyrepl.simple_interact import _get_reader 

142 r = _get_reader() 

143 if r.threading_hook is not None: 

144 r.threading_hook.add("") # type: ignore 

145 

146 

147if __name__ == '__main__': 

148 parser = argparse.ArgumentParser( 

149 prog="python3 -m asyncio", 

150 description="Interactive asyncio shell and CLI tools", 

151 color=True, 

152 ) 

153 subparsers = parser.add_subparsers(help="sub-commands", dest="command") 

154 ps = subparsers.add_parser( 

155 "ps", help="Display a table of all pending tasks in a process" 

156 ) 

157 ps.add_argument("pid", type=int, help="Process ID to inspect") 

158 formats = [fmt.value for fmt in TaskTableOutputFormat] 

159 formats_to_show = [fmt for fmt in formats 

160 if fmt != TaskTableOutputFormat.bsv.value] 

161 ps.add_argument("--format", choices=formats, default="table", 

162 metavar=f"{{{','.join(formats_to_show)}}}") 

163 pstree = subparsers.add_parser( 

164 "pstree", help="Display a tree of all pending tasks in a process" 

165 ) 

166 pstree.add_argument("pid", type=int, help="Process ID to inspect") 

167 args = parser.parse_args() 

168 match args.command: 

169 case "ps": 

170 display_awaited_by_tasks_table(args.pid, format=args.format) 

171 sys.exit(0) 

172 case "pstree": 

173 display_awaited_by_tasks_tree(args.pid) 

174 sys.exit(0) 

175 case None: 

176 pass # continue to the interactive shell 

177 case _: 

178 # shouldn't happen as an invalid command-line wouldn't parse 

179 # but let's keep it for the next person adding a command 

180 print(f"error: unhandled command {args.command}", file=sys.stderr) 

181 parser.print_usage(file=sys.stderr) 

182 sys.exit(1) 

183 

184 sys.audit("cpython.run_stdin") 

185 

186 if os.getenv('PYTHON_BASIC_REPL'): 

187 CAN_USE_PYREPL = False 

188 else: 

189 from _pyrepl.main import CAN_USE_PYREPL 

190 

191 return_code = 0 

192 loop = asyncio.new_event_loop() 

193 asyncio.set_event_loop(loop) 

194 

195 repl_locals = {'asyncio': asyncio} 

196 for key in {'__name__', '__package__', 

197 '__loader__', '__spec__', 

198 '__builtins__', '__file__'}: 

199 repl_locals[key] = locals()[key] 

200 

201 console = AsyncIOInteractiveConsole(repl_locals, loop) 

202 

203 repl_future = None 

204 keyboard_interrupted = False 

205 

206 try: 

207 import readline # NoQA 

208 except ImportError: 

209 readline = None 

210 

211 interactive_hook = getattr(sys, "__interactivehook__", None) 

212 

213 if interactive_hook is not None: 

214 sys.audit("cpython.run_interactivehook", interactive_hook) 

215 interactive_hook() 

216 

217 if interactive_hook is site.register_readline: 

218 # Fix the completer function to use the interactive console locals 

219 try: 

220 import rlcompleter 

221 except: 

222 pass 

223 else: 

224 if readline is not None: 

225 completer = rlcompleter.Completer(console.locals) 

226 readline.set_completer(completer.complete) 

227 

228 repl_thread = REPLThread(name="Interactive thread") 

229 repl_thread.daemon = True 

230 repl_thread.start() 

231 

232 while True: 

233 try: 

234 loop.run_forever() 

235 except KeyboardInterrupt: 

236 keyboard_interrupted = True 

237 if repl_future and not repl_future.done(): 

238 repl_future.cancel() 

239 repl_thread.interrupt() 

240 continue 

241 else: 

242 break 

243 

244 console.write('exiting asyncio REPL...\n') 

245 sys.exit(return_code)