Coverage for Lib/asyncio/runners.py: 97%

116 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-24 03:28 +0000

1__all__ = ('Runner', 'run') 

2 

3import contextvars 

4import enum 

5import functools 

6import inspect 

7import threading 

8import signal 

9from . import coroutines 

10from . import events 

11from . import exceptions 

12from . import tasks 

13from . import constants 

14 

15class _State(enum.Enum): 

16 CREATED = "created" 

17 INITIALIZED = "initialized" 

18 CLOSED = "closed" 

19 

20 

21class Runner: 

22 """A context manager that controls event loop life cycle. 

23 

24 The context manager always creates a new event loop, 

25 allows to run async functions inside it, 

26 and properly finalizes the loop at the context manager exit. 

27 

28 If debug is True, the event loop will be run in debug mode. 

29 If loop_factory is passed, it is used for new event loop creation. 

30 

31 asyncio.run(main(), debug=True) 

32 

33 is a shortcut for 

34 

35 with asyncio.Runner(debug=True) as runner: 

36 runner.run(main()) 

37 

38 The run() method can be called multiple times within the runner's 

39 context. 

40 

41 This can be useful for interactive console (e.g. IPython), 

42 unittest runners, console tools, -- everywhere when async code 

43 is called from existing sync framework and where the preferred single 

44 asyncio.run() call doesn't work. 

45 

46 """ 

47 

48 # Note: the class is final, it is not intended for inheritance. 

49 

50 def __init__(self, *, debug=None, loop_factory=None): 

51 self._state = _State.CREATED 

52 self._debug = debug 

53 self._loop_factory = loop_factory 

54 self._loop = None 

55 self._context = None 

56 self._interrupt_count = 0 

57 self._set_event_loop = False 

58 

59 def __enter__(self): 

60 self._lazy_init() 

61 return self 

62 

63 def __exit__(self, exc_type, exc_val, exc_tb): 

64 self.close() 

65 

66 def close(self): 

67 """Shutdown and close event loop.""" 

68 if self._state is not _State.INITIALIZED: 

69 return 

70 try: 

71 loop = self._loop 

72 _cancel_all_tasks(loop) 

73 loop.run_until_complete(loop.shutdown_asyncgens()) 

74 loop.run_until_complete( 

75 loop.shutdown_default_executor(constants.THREAD_JOIN_TIMEOUT)) 

76 finally: 

77 if self._set_event_loop: 

78 events.set_event_loop(None) 

79 loop.close() 

80 self._loop = None 

81 self._state = _State.CLOSED 

82 

83 def get_loop(self): 

84 """Return embedded event loop.""" 

85 self._lazy_init() 

86 return self._loop 

87 

88 def run(self, coro, *, context=None): 

89 """Run code in the embedded event loop.""" 

90 if events._get_running_loop() is not None: 

91 # fail fast with short traceback 

92 raise RuntimeError( 

93 "Runner.run() cannot be called from a running event loop") 

94 

95 self._lazy_init() 

96 

97 if not coroutines.iscoroutine(coro): 

98 if inspect.isawaitable(coro): 

99 async def _wrap_awaitable(awaitable): 

100 return await awaitable 

101 

102 coro = _wrap_awaitable(coro) 

103 else: 

104 raise TypeError('An asyncio.Future, a coroutine or an ' 

105 'awaitable is required') 

106 

107 if context is None: 

108 context = self._context 

109 

110 task = self._loop.create_task(coro, context=context) 

111 

112 if (threading.current_thread() is threading.main_thread() 

113 and signal.getsignal(signal.SIGINT) is signal.default_int_handler 

114 ): 

115 sigint_handler = functools.partial(self._on_sigint, main_task=task) 

116 try: 

117 signal.signal(signal.SIGINT, sigint_handler) 

118 except ValueError: 

119 # `signal.signal` may throw if `threading.main_thread` does 

120 # not support signals (e.g. embedded interpreter with signals 

121 # not registered - see gh-91880) 

122 sigint_handler = None 

123 else: 

124 sigint_handler = None 

125 

126 self._interrupt_count = 0 

127 try: 

128 return self._loop.run_until_complete(task) 

129 except exceptions.CancelledError: 

130 if self._interrupt_count > 0: 130 ↛ 134line 130 didn't jump to line 134 because the condition on line 130 was always true

131 uncancel = getattr(task, "uncancel", None) 

132 if uncancel is not None and uncancel() == 0: 

133 raise KeyboardInterrupt() 

134 raise # CancelledError 

135 finally: 

136 if (sigint_handler is not None 

137 and signal.getsignal(signal.SIGINT) is sigint_handler 

138 ): 

139 signal.signal(signal.SIGINT, signal.default_int_handler) 

140 

141 def _lazy_init(self): 

142 if self._state is _State.CLOSED: 

143 raise RuntimeError("Runner is closed") 

144 if self._state is _State.INITIALIZED: 

145 return 

146 if self._loop_factory is None: 

147 self._loop = events.new_event_loop() 

148 if not self._set_event_loop: 148 ↛ 155line 148 didn't jump to line 155 because the condition on line 148 was always true

149 # Call set_event_loop only once to avoid calling 

150 # attach_loop multiple times on child watchers 

151 events.set_event_loop(self._loop) 

152 self._set_event_loop = True 

153 else: 

154 self._loop = self._loop_factory() 

155 if self._debug is not None: 

156 self._loop.set_debug(self._debug) 

157 self._context = contextvars.copy_context() 

158 self._state = _State.INITIALIZED 

159 

160 def _on_sigint(self, signum, frame, main_task): 

161 self._interrupt_count += 1 

162 if self._interrupt_count == 1 and not main_task.done(): 162 ↛ 167line 162 didn't jump to line 167 because the condition on line 162 was always true

163 main_task.cancel() 

164 # wakeup loop if it is blocked by select() with long timeout 

165 self._loop.call_soon_threadsafe(lambda: None) 

166 return 

167 raise KeyboardInterrupt() 

168 

169 

170def run(main, *, debug=None, loop_factory=None): 

171 """Execute the coroutine and return the result. 

172 

173 This function runs the passed coroutine, taking care of 

174 managing the asyncio event loop, finalizing asynchronous 

175 generators and closing the default executor. 

176 

177 This function cannot be called when another asyncio event loop is 

178 running in the same thread. 

179 

180 If debug is True, the event loop will be run in debug mode. 

181 If loop_factory is passed, it is used for new event loop creation. 

182 

183 This function always creates a new event loop and closes it at the end. 

184 It should be used as a main entry point for asyncio programs, and should 

185 ideally only be called once. 

186 

187 The executor is given a timeout duration of 5 minutes to shutdown. 

188 If the executor hasn't finished within that duration, a warning is 

189 emitted and the executor is closed. 

190 

191 Example: 

192 

193 async def main(): 

194 await asyncio.sleep(1) 

195 print('hello') 

196 

197 asyncio.run(main()) 

198 """ 

199 if events._get_running_loop() is not None: 

200 # fail fast with short traceback 

201 raise RuntimeError( 

202 "asyncio.run() cannot be called from a running event loop") 

203 

204 with Runner(debug=debug, loop_factory=loop_factory) as runner: 

205 return runner.run(main) 

206 

207 

208def _cancel_all_tasks(loop): 

209 to_cancel = tasks.all_tasks(loop) 

210 if not to_cancel: 

211 return 

212 

213 for task in to_cancel: 

214 task.cancel() 

215 

216 loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True)) 

217 

218 for task in to_cancel: 

219 if task.cancelled(): 

220 continue 

221 if task.exception() is not None: 221 ↛ 218line 221 didn't jump to line 218 because the condition on line 221 was always true

222 loop.call_exception_handler({ 

223 'message': 'unhandled exception during asyncio.run() shutdown', 

224 'exception': task.exception(), 

225 'task': task, 

226 })