Coverage for Lib/asyncio/runners.py: 97%
116 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
1__all__ = ('Runner', 'run')
3import contextvars
4import enum
5import functools
6import inspect
7import threading
8import signal
9from . import coroutines
10from . import events
11from . import exceptions
12from . import tasks
13from . import constants
15class _State(enum.Enum):
16 CREATED = "created"
17 INITIALIZED = "initialized"
18 CLOSED = "closed"
21class Runner:
22 """A context manager that controls event loop life cycle.
24 The context manager always creates a new event loop,
25 allows to run async functions inside it,
26 and properly finalizes the loop at the context manager exit.
28 If debug is True, the event loop will be run in debug mode.
29 If loop_factory is passed, it is used for new event loop creation.
31 asyncio.run(main(), debug=True)
33 is a shortcut for
35 with asyncio.Runner(debug=True) as runner:
36 runner.run(main())
38 The run() method can be called multiple times within the runner's context.
40 This can be useful for interactive console (e.g. IPython),
41 unittest runners, console tools, -- everywhere when async code
42 is called from existing sync framework and where the preferred single
43 asyncio.run() call doesn't work.
45 """
47 # Note: the class is final, it is not intended for inheritance.
49 def __init__(self, *, debug=None, loop_factory=None):
50 self._state = _State.CREATED
51 self._debug = debug
52 self._loop_factory = loop_factory
53 self._loop = None
54 self._context = None
55 self._interrupt_count = 0
56 self._set_event_loop = False
58 def __enter__(self):
59 self._lazy_init()
60 return self
62 def __exit__(self, exc_type, exc_val, exc_tb):
63 self.close()
65 def close(self):
66 """Shutdown and close event loop."""
67 if self._state is not _State.INITIALIZED:
68 return
69 try:
70 loop = self._loop
71 _cancel_all_tasks(loop)
72 loop.run_until_complete(loop.shutdown_asyncgens())
73 loop.run_until_complete(
74 loop.shutdown_default_executor(constants.THREAD_JOIN_TIMEOUT))
75 finally:
76 if self._set_event_loop:
77 events.set_event_loop(None)
78 loop.close()
79 self._loop = None
80 self._state = _State.CLOSED
82 def get_loop(self):
83 """Return embedded event loop."""
84 self._lazy_init()
85 return self._loop
87 def run(self, coro, *, context=None):
88 """Run code in the embedded event loop."""
89 if events._get_running_loop() is not None:
90 # fail fast with short traceback
91 raise RuntimeError(
92 "Runner.run() cannot be called from a running event loop")
94 self._lazy_init()
96 if not coroutines.iscoroutine(coro):
97 if inspect.isawaitable(coro):
98 async def _wrap_awaitable(awaitable):
99 return await awaitable
101 coro = _wrap_awaitable(coro)
102 else:
103 raise TypeError('An asyncio.Future, a coroutine or an '
104 'awaitable is required')
106 if context is None:
107 context = self._context
109 task = self._loop.create_task(coro, context=context)
111 if (threading.current_thread() is threading.main_thread()
112 and signal.getsignal(signal.SIGINT) is signal.default_int_handler
113 ):
114 sigint_handler = functools.partial(self._on_sigint, main_task=task)
115 try:
116 signal.signal(signal.SIGINT, sigint_handler)
117 except ValueError:
118 # `signal.signal` may throw if `threading.main_thread` does
119 # not support signals (e.g. embedded interpreter with signals
120 # not registered - see gh-91880)
121 sigint_handler = None
122 else:
123 sigint_handler = None
125 self._interrupt_count = 0
126 try:
127 return self._loop.run_until_complete(task)
128 except exceptions.CancelledError:
129 if self._interrupt_count > 0: 129 ↛ 133line 129 didn't jump to line 133 because the condition on line 129 was always true
130 uncancel = getattr(task, "uncancel", None)
131 if uncancel is not None and uncancel() == 0:
132 raise KeyboardInterrupt()
133 raise # CancelledError
134 finally:
135 if (sigint_handler is not None
136 and signal.getsignal(signal.SIGINT) is sigint_handler
137 ):
138 signal.signal(signal.SIGINT, signal.default_int_handler)
140 def _lazy_init(self):
141 if self._state is _State.CLOSED:
142 raise RuntimeError("Runner is closed")
143 if self._state is _State.INITIALIZED:
144 return
145 if self._loop_factory is None:
146 self._loop = events.new_event_loop()
147 if not self._set_event_loop: 147 ↛ 154line 147 didn't jump to line 154 because the condition on line 147 was always true
148 # Call set_event_loop only once to avoid calling
149 # attach_loop multiple times on child watchers
150 events.set_event_loop(self._loop)
151 self._set_event_loop = True
152 else:
153 self._loop = self._loop_factory()
154 if self._debug is not None:
155 self._loop.set_debug(self._debug)
156 self._context = contextvars.copy_context()
157 self._state = _State.INITIALIZED
159 def _on_sigint(self, signum, frame, main_task):
160 self._interrupt_count += 1
161 if self._interrupt_count == 1 and not main_task.done(): 161 ↛ 166line 161 didn't jump to line 166 because the condition on line 161 was always true
162 main_task.cancel()
163 # wakeup loop if it is blocked by select() with long timeout
164 self._loop.call_soon_threadsafe(lambda: None)
165 return
166 raise KeyboardInterrupt()
169def run(main, *, debug=None, loop_factory=None):
170 """Execute the coroutine and return the result.
172 This function runs the passed coroutine, taking care of
173 managing the asyncio event loop, finalizing asynchronous
174 generators and closing the default executor.
176 This function cannot be called when another asyncio event loop is
177 running in the same thread.
179 If debug is True, the event loop will be run in debug mode.
180 If loop_factory is passed, it is used for new event loop creation.
182 This function always creates a new event loop and closes it at the end.
183 It should be used as a main entry point for asyncio programs, and should
184 ideally only be called once.
186 The executor is given a timeout duration of 5 minutes to shutdown.
187 If the executor hasn't finished within that duration, a warning is
188 emitted and the executor is closed.
190 Example:
192 async def main():
193 await asyncio.sleep(1)
194 print('hello')
196 asyncio.run(main())
197 """
198 if events._get_running_loop() is not None:
199 # fail fast with short traceback
200 raise RuntimeError(
201 "asyncio.run() cannot be called from a running event loop")
203 with Runner(debug=debug, loop_factory=loop_factory) as runner:
204 return runner.run(main)
207def _cancel_all_tasks(loop):
208 to_cancel = tasks.all_tasks(loop)
209 if not to_cancel:
210 return
212 for task in to_cancel:
213 task.cancel()
215 loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
217 for task in to_cancel:
218 if task.cancelled():
219 continue
220 if task.exception() is not None: 220 ↛ 217line 220 didn't jump to line 217 because the condition on line 220 was always true
221 loop.call_exception_handler({
222 'message': 'unhandled exception during asyncio.run() shutdown',
223 'exception': task.exception(),
224 'task': task,
225 })