Coverage for Lib/asyncio/runners.py: 97%

116 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-15 02:02 +0000

1__all__ = ('Runner', 'run') 

2 

3import contextvars 

4import enum 

5import functools 

6import inspect 

7import threading 

8import signal 

9from . import coroutines 

10from . import events 

11from . import exceptions 

12from . import tasks 

13from . import constants 

14 

15class _State(enum.Enum): 

16 CREATED = "created" 

17 INITIALIZED = "initialized" 

18 CLOSED = "closed" 

19 

20 

21class Runner: 

22 """A context manager that controls event loop life cycle. 

23 

24 The context manager always creates a new event loop, 

25 allows to run async functions inside it, 

26 and properly finalizes the loop at the context manager exit. 

27 

28 If debug is True, the event loop will be run in debug mode. 

29 If loop_factory is passed, it is used for new event loop creation. 

30 

31 asyncio.run(main(), debug=True) 

32 

33 is a shortcut for 

34 

35 with asyncio.Runner(debug=True) as runner: 

36 runner.run(main()) 

37 

38 The run() method can be called multiple times within the runner's context. 

39 

40 This can be useful for interactive console (e.g. IPython), 

41 unittest runners, console tools, -- everywhere when async code 

42 is called from existing sync framework and where the preferred single 

43 asyncio.run() call doesn't work. 

44 

45 """ 

46 

47 # Note: the class is final, it is not intended for inheritance. 

48 

49 def __init__(self, *, debug=None, loop_factory=None): 

50 self._state = _State.CREATED 

51 self._debug = debug 

52 self._loop_factory = loop_factory 

53 self._loop = None 

54 self._context = None 

55 self._interrupt_count = 0 

56 self._set_event_loop = False 

57 

58 def __enter__(self): 

59 self._lazy_init() 

60 return self 

61 

62 def __exit__(self, exc_type, exc_val, exc_tb): 

63 self.close() 

64 

65 def close(self): 

66 """Shutdown and close event loop.""" 

67 if self._state is not _State.INITIALIZED: 

68 return 

69 try: 

70 loop = self._loop 

71 _cancel_all_tasks(loop) 

72 loop.run_until_complete(loop.shutdown_asyncgens()) 

73 loop.run_until_complete( 

74 loop.shutdown_default_executor(constants.THREAD_JOIN_TIMEOUT)) 

75 finally: 

76 if self._set_event_loop: 

77 events.set_event_loop(None) 

78 loop.close() 

79 self._loop = None 

80 self._state = _State.CLOSED 

81 

82 def get_loop(self): 

83 """Return embedded event loop.""" 

84 self._lazy_init() 

85 return self._loop 

86 

87 def run(self, coro, *, context=None): 

88 """Run code in the embedded event loop.""" 

89 if events._get_running_loop() is not None: 

90 # fail fast with short traceback 

91 raise RuntimeError( 

92 "Runner.run() cannot be called from a running event loop") 

93 

94 self._lazy_init() 

95 

96 if not coroutines.iscoroutine(coro): 

97 if inspect.isawaitable(coro): 

98 async def _wrap_awaitable(awaitable): 

99 return await awaitable 

100 

101 coro = _wrap_awaitable(coro) 

102 else: 

103 raise TypeError('An asyncio.Future, a coroutine or an ' 

104 'awaitable is required') 

105 

106 if context is None: 

107 context = self._context 

108 

109 task = self._loop.create_task(coro, context=context) 

110 

111 if (threading.current_thread() is threading.main_thread() 

112 and signal.getsignal(signal.SIGINT) is signal.default_int_handler 

113 ): 

114 sigint_handler = functools.partial(self._on_sigint, main_task=task) 

115 try: 

116 signal.signal(signal.SIGINT, sigint_handler) 

117 except ValueError: 

118 # `signal.signal` may throw if `threading.main_thread` does 

119 # not support signals (e.g. embedded interpreter with signals 

120 # not registered - see gh-91880) 

121 sigint_handler = None 

122 else: 

123 sigint_handler = None 

124 

125 self._interrupt_count = 0 

126 try: 

127 return self._loop.run_until_complete(task) 

128 except exceptions.CancelledError: 

129 if self._interrupt_count > 0: 129 ↛ 133line 129 didn't jump to line 133 because the condition on line 129 was always true

130 uncancel = getattr(task, "uncancel", None) 

131 if uncancel is not None and uncancel() == 0: 

132 raise KeyboardInterrupt() 

133 raise # CancelledError 

134 finally: 

135 if (sigint_handler is not None 

136 and signal.getsignal(signal.SIGINT) is sigint_handler 

137 ): 

138 signal.signal(signal.SIGINT, signal.default_int_handler) 

139 

140 def _lazy_init(self): 

141 if self._state is _State.CLOSED: 

142 raise RuntimeError("Runner is closed") 

143 if self._state is _State.INITIALIZED: 

144 return 

145 if self._loop_factory is None: 

146 self._loop = events.new_event_loop() 

147 if not self._set_event_loop: 147 ↛ 154line 147 didn't jump to line 154 because the condition on line 147 was always true

148 # Call set_event_loop only once to avoid calling 

149 # attach_loop multiple times on child watchers 

150 events.set_event_loop(self._loop) 

151 self._set_event_loop = True 

152 else: 

153 self._loop = self._loop_factory() 

154 if self._debug is not None: 

155 self._loop.set_debug(self._debug) 

156 self._context = contextvars.copy_context() 

157 self._state = _State.INITIALIZED 

158 

159 def _on_sigint(self, signum, frame, main_task): 

160 self._interrupt_count += 1 

161 if self._interrupt_count == 1 and not main_task.done(): 161 ↛ 166line 161 didn't jump to line 166 because the condition on line 161 was always true

162 main_task.cancel() 

163 # wakeup loop if it is blocked by select() with long timeout 

164 self._loop.call_soon_threadsafe(lambda: None) 

165 return 

166 raise KeyboardInterrupt() 

167 

168 

169def run(main, *, debug=None, loop_factory=None): 

170 """Execute the coroutine and return the result. 

171 

172 This function runs the passed coroutine, taking care of 

173 managing the asyncio event loop, finalizing asynchronous 

174 generators and closing the default executor. 

175 

176 This function cannot be called when another asyncio event loop is 

177 running in the same thread. 

178 

179 If debug is True, the event loop will be run in debug mode. 

180 If loop_factory is passed, it is used for new event loop creation. 

181 

182 This function always creates a new event loop and closes it at the end. 

183 It should be used as a main entry point for asyncio programs, and should 

184 ideally only be called once. 

185 

186 The executor is given a timeout duration of 5 minutes to shutdown. 

187 If the executor hasn't finished within that duration, a warning is 

188 emitted and the executor is closed. 

189 

190 Example: 

191 

192 async def main(): 

193 await asyncio.sleep(1) 

194 print('hello') 

195 

196 asyncio.run(main()) 

197 """ 

198 if events._get_running_loop() is not None: 

199 # fail fast with short traceback 

200 raise RuntimeError( 

201 "asyncio.run() cannot be called from a running event loop") 

202 

203 with Runner(debug=debug, loop_factory=loop_factory) as runner: 

204 return runner.run(main) 

205 

206 

207def _cancel_all_tasks(loop): 

208 to_cancel = tasks.all_tasks(loop) 

209 if not to_cancel: 

210 return 

211 

212 for task in to_cancel: 

213 task.cancel() 

214 

215 loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True)) 

216 

217 for task in to_cancel: 

218 if task.cancelled(): 

219 continue 

220 if task.exception() is not None: 220 ↛ 217line 220 didn't jump to line 217 because the condition on line 220 was always true

221 loop.call_exception_handler({ 

222 'message': 'unhandled exception during asyncio.run() shutdown', 

223 'exception': task.exception(), 

224 'task': task, 

225 })