Coverage for Lib/asyncio/runners.py: 97%
116 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:28 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:28 +0000
1__all__ = ('Runner', 'run')
3import contextvars
4import enum
5import functools
6import inspect
7import threading
8import signal
9from . import coroutines
10from . import events
11from . import exceptions
12from . import tasks
13from . import constants
15class _State(enum.Enum):
16 CREATED = "created"
17 INITIALIZED = "initialized"
18 CLOSED = "closed"
21class Runner:
22 """A context manager that controls event loop life cycle.
24 The context manager always creates a new event loop,
25 allows to run async functions inside it,
26 and properly finalizes the loop at the context manager exit.
28 If debug is True, the event loop will be run in debug mode.
29 If loop_factory is passed, it is used for new event loop creation.
31 asyncio.run(main(), debug=True)
33 is a shortcut for
35 with asyncio.Runner(debug=True) as runner:
36 runner.run(main())
38 The run() method can be called multiple times within the runner's
39 context.
41 This can be useful for interactive console (e.g. IPython),
42 unittest runners, console tools, -- everywhere when async code
43 is called from existing sync framework and where the preferred single
44 asyncio.run() call doesn't work.
46 """
48 # Note: the class is final, it is not intended for inheritance.
50 def __init__(self, *, debug=None, loop_factory=None):
51 self._state = _State.CREATED
52 self._debug = debug
53 self._loop_factory = loop_factory
54 self._loop = None
55 self._context = None
56 self._interrupt_count = 0
57 self._set_event_loop = False
59 def __enter__(self):
60 self._lazy_init()
61 return self
63 def __exit__(self, exc_type, exc_val, exc_tb):
64 self.close()
66 def close(self):
67 """Shutdown and close event loop."""
68 if self._state is not _State.INITIALIZED:
69 return
70 try:
71 loop = self._loop
72 _cancel_all_tasks(loop)
73 loop.run_until_complete(loop.shutdown_asyncgens())
74 loop.run_until_complete(
75 loop.shutdown_default_executor(constants.THREAD_JOIN_TIMEOUT))
76 finally:
77 if self._set_event_loop:
78 events.set_event_loop(None)
79 loop.close()
80 self._loop = None
81 self._state = _State.CLOSED
83 def get_loop(self):
84 """Return embedded event loop."""
85 self._lazy_init()
86 return self._loop
88 def run(self, coro, *, context=None):
89 """Run code in the embedded event loop."""
90 if events._get_running_loop() is not None:
91 # fail fast with short traceback
92 raise RuntimeError(
93 "Runner.run() cannot be called from a running event loop")
95 self._lazy_init()
97 if not coroutines.iscoroutine(coro):
98 if inspect.isawaitable(coro):
99 async def _wrap_awaitable(awaitable):
100 return await awaitable
102 coro = _wrap_awaitable(coro)
103 else:
104 raise TypeError('An asyncio.Future, a coroutine or an '
105 'awaitable is required')
107 if context is None:
108 context = self._context
110 task = self._loop.create_task(coro, context=context)
112 if (threading.current_thread() is threading.main_thread()
113 and signal.getsignal(signal.SIGINT) is signal.default_int_handler
114 ):
115 sigint_handler = functools.partial(self._on_sigint, main_task=task)
116 try:
117 signal.signal(signal.SIGINT, sigint_handler)
118 except ValueError:
119 # `signal.signal` may throw if `threading.main_thread` does
120 # not support signals (e.g. embedded interpreter with signals
121 # not registered - see gh-91880)
122 sigint_handler = None
123 else:
124 sigint_handler = None
126 self._interrupt_count = 0
127 try:
128 return self._loop.run_until_complete(task)
129 except exceptions.CancelledError:
130 if self._interrupt_count > 0: 130 ↛ 134line 130 didn't jump to line 134 because the condition on line 130 was always true
131 uncancel = getattr(task, "uncancel", None)
132 if uncancel is not None and uncancel() == 0:
133 raise KeyboardInterrupt()
134 raise # CancelledError
135 finally:
136 if (sigint_handler is not None
137 and signal.getsignal(signal.SIGINT) is sigint_handler
138 ):
139 signal.signal(signal.SIGINT, signal.default_int_handler)
141 def _lazy_init(self):
142 if self._state is _State.CLOSED:
143 raise RuntimeError("Runner is closed")
144 if self._state is _State.INITIALIZED:
145 return
146 if self._loop_factory is None:
147 self._loop = events.new_event_loop()
148 if not self._set_event_loop: 148 ↛ 155line 148 didn't jump to line 155 because the condition on line 148 was always true
149 # Call set_event_loop only once to avoid calling
150 # attach_loop multiple times on child watchers
151 events.set_event_loop(self._loop)
152 self._set_event_loop = True
153 else:
154 self._loop = self._loop_factory()
155 if self._debug is not None:
156 self._loop.set_debug(self._debug)
157 self._context = contextvars.copy_context()
158 self._state = _State.INITIALIZED
160 def _on_sigint(self, signum, frame, main_task):
161 self._interrupt_count += 1
162 if self._interrupt_count == 1 and not main_task.done(): 162 ↛ 167line 162 didn't jump to line 167 because the condition on line 162 was always true
163 main_task.cancel()
164 # wakeup loop if it is blocked by select() with long timeout
165 self._loop.call_soon_threadsafe(lambda: None)
166 return
167 raise KeyboardInterrupt()
170def run(main, *, debug=None, loop_factory=None):
171 """Execute the coroutine and return the result.
173 This function runs the passed coroutine, taking care of
174 managing the asyncio event loop, finalizing asynchronous
175 generators and closing the default executor.
177 This function cannot be called when another asyncio event loop is
178 running in the same thread.
180 If debug is True, the event loop will be run in debug mode.
181 If loop_factory is passed, it is used for new event loop creation.
183 This function always creates a new event loop and closes it at the end.
184 It should be used as a main entry point for asyncio programs, and should
185 ideally only be called once.
187 The executor is given a timeout duration of 5 minutes to shutdown.
188 If the executor hasn't finished within that duration, a warning is
189 emitted and the executor is closed.
191 Example:
193 async def main():
194 await asyncio.sleep(1)
195 print('hello')
197 asyncio.run(main())
198 """
199 if events._get_running_loop() is not None:
200 # fail fast with short traceback
201 raise RuntimeError(
202 "asyncio.run() cannot be called from a running event loop")
204 with Runner(debug=debug, loop_factory=loop_factory) as runner:
205 return runner.run(main)
208def _cancel_all_tasks(loop):
209 to_cancel = tasks.all_tasks(loop)
210 if not to_cancel:
211 return
213 for task in to_cancel:
214 task.cancel()
216 loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
218 for task in to_cancel:
219 if task.cancelled():
220 continue
221 if task.exception() is not None: 221 ↛ 218line 221 didn't jump to line 218 because the condition on line 221 was always true
222 loop.call_exception_handler({
223 'message': 'unhandled exception during asyncio.run() shutdown',
224 'exception': task.exception(),
225 'task': task,
226 })