Coverage for Lib/asyncio/__main__.py: 0%

94 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-15 02:02 +0000

1import ast 

2import asyncio 

3import concurrent.futures 

4import contextvars 

5import inspect 

6import os 

7import site 

8import sys 

9import threading 

10import types 

11import warnings 

12 

13from _colorize import can_colorize, ANSIColors # type: ignore[import-not-found] 

14from _pyrepl.console import InteractiveColoredConsole 

15 

16from . import futures 

17 

18 

19class AsyncIOInteractiveConsole(InteractiveColoredConsole): 

20 

21 def __init__(self, locals, loop): 

22 super().__init__(locals, filename="<stdin>") 

23 self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT 

24 

25 self.loop = loop 

26 self.context = contextvars.copy_context() 

27 

28 def runcode(self, code): 

29 global return_code 

30 future = concurrent.futures.Future() 

31 

32 def callback(): 

33 global return_code 

34 global repl_future 

35 global keyboard_interrupted 

36 

37 repl_future = None 

38 keyboard_interrupted = False 

39 

40 func = types.FunctionType(code, self.locals) 

41 try: 

42 coro = func() 

43 except SystemExit as se: 

44 return_code = se.code 

45 self.loop.stop() 

46 return 

47 except KeyboardInterrupt as ex: 

48 keyboard_interrupted = True 

49 future.set_exception(ex) 

50 return 

51 except BaseException as ex: 

52 future.set_exception(ex) 

53 return 

54 

55 if not inspect.iscoroutine(coro): 

56 future.set_result(coro) 

57 return 

58 

59 try: 

60 repl_future = self.loop.create_task(coro, context=self.context) 

61 futures._chain_future(repl_future, future) 

62 except BaseException as exc: 

63 future.set_exception(exc) 

64 

65 loop.call_soon_threadsafe(callback, context=self.context) 

66 

67 try: 

68 return future.result() 

69 except SystemExit as se: 

70 return_code = se.code 

71 self.loop.stop() 

72 return 

73 except BaseException: 

74 if keyboard_interrupted: 

75 self.write("\nKeyboardInterrupt\n") 

76 else: 

77 self.showtraceback() 

78 return self.STATEMENT_FAILED 

79 

80class REPLThread(threading.Thread): 

81 

82 def run(self): 

83 global return_code 

84 

85 try: 

86 banner = ( 

87 f'asyncio REPL {sys.version} on {sys.platform}\n' 

88 f'Use "await" directly instead of "asyncio.run()".\n' 

89 f'Type "help", "copyright", "credits" or "license" ' 

90 f'for more information.\n' 

91 ) 

92 

93 console.write(banner) 

94 

95 if startup_path := os.getenv("PYTHONSTARTUP"): 

96 sys.audit("cpython.run_startup", startup_path) 

97 

98 import tokenize 

99 with tokenize.open(startup_path) as f: 

100 startup_code = compile(f.read(), startup_path, "exec") 

101 exec(startup_code, console.locals) 

102 

103 ps1 = getattr(sys, "ps1", ">>> ") 

104 if can_colorize() and CAN_USE_PYREPL: 

105 ps1 = f"{ANSIColors.BOLD_MAGENTA}{ps1}{ANSIColors.RESET}" 

106 console.write(f"{ps1}import asyncio\n") 

107 

108 if CAN_USE_PYREPL: 

109 from _pyrepl.simple_interact import ( 

110 run_multiline_interactive_console, 

111 ) 

112 try: 

113 run_multiline_interactive_console(console) 

114 except SystemExit: 

115 # expected via the `exit` and `quit` commands 

116 pass 

117 except BaseException: 

118 # unexpected issue 

119 console.showtraceback() 

120 console.write("Internal error, ") 

121 return_code = 1 

122 else: 

123 console.interact(banner="", exitmsg="") 

124 finally: 

125 warnings.filterwarnings( 

126 'ignore', 

127 message=r'^coroutine .* was never awaited$', 

128 category=RuntimeWarning) 

129 

130 loop.call_soon_threadsafe(loop.stop) 

131 

132 def interrupt(self) -> None: 

133 if not CAN_USE_PYREPL: 

134 return 

135 

136 from _pyrepl.simple_interact import _get_reader 

137 r = _get_reader() 

138 if r.threading_hook is not None: 

139 r.threading_hook.add("") # type: ignore 

140 

141 

142if __name__ == '__main__': 

143 sys.audit("cpython.run_stdin") 

144 

145 if os.getenv('PYTHON_BASIC_REPL'): 

146 CAN_USE_PYREPL = False 

147 else: 

148 from _pyrepl.main import CAN_USE_PYREPL 

149 

150 return_code = 0 

151 loop = asyncio.new_event_loop() 

152 asyncio.set_event_loop(loop) 

153 

154 repl_locals = {'asyncio': asyncio} 

155 for key in {'__name__', '__package__', 

156 '__loader__', '__spec__', 

157 '__builtins__', '__file__'}: 

158 repl_locals[key] = locals()[key] 

159 

160 console = AsyncIOInteractiveConsole(repl_locals, loop) 

161 

162 repl_future = None 

163 keyboard_interrupted = False 

164 

165 try: 

166 import readline # NoQA 

167 except ImportError: 

168 readline = None 

169 

170 interactive_hook = getattr(sys, "__interactivehook__", None) 

171 

172 if interactive_hook is not None: 

173 sys.audit("cpython.run_interactivehook", interactive_hook) 

174 interactive_hook() 

175 

176 if interactive_hook is site.register_readline: 

177 # Fix the completer function to use the interactive console locals 

178 try: 

179 import rlcompleter 

180 except: 

181 pass 

182 else: 

183 if readline is not None: 

184 completer = rlcompleter.Completer(console.locals) 

185 readline.set_completer(completer.complete) 

186 

187 repl_thread = REPLThread(name="Interactive thread") 

188 repl_thread.daemon = True 

189 repl_thread.start() 

190 

191 while True: 

192 try: 

193 loop.run_forever() 

194 except KeyboardInterrupt: 

195 keyboard_interrupted = True 

196 if repl_future and not repl_future.done(): 

197 repl_future.cancel() 

198 repl_thread.interrupt() 

199 continue 

200 else: 

201 break 

202 

203 console.write('exiting asyncio REPL...\n') 

204 sys.exit(return_code)