Coverage for Lib / asyncio / base_events.py: 88%
1164 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 02:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 02:07 +0000
1"""Base implementation of event loop.
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of I/O events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
16import collections
17import collections.abc
18import concurrent.futures
19import errno
20import heapq
21import itertools
22import math
23import os
24import socket
25import stat
26import subprocess
27import sys
28import threading
29import time
30import traceback
31import warnings
32import weakref
34try:
35 import ssl
36except ImportError: # pragma: no cover
37 ssl = None
39from . import constants
40from . import coroutines
41from . import events
42from . import exceptions
43from . import futures
44from . import protocols
45from . import sslproto
46from . import staggered
47from . import tasks
48from . import timeouts
49from . import transports
50from . import trsock
51from .log import logger
54__all__ = 'BaseEventLoop','Server',
57# Minimum number of _scheduled timer handles before cleanup of
58# cancelled handles is performed.
59_MIN_SCHEDULED_TIMER_HANDLES = 100
61# Minimum fraction of _scheduled timer handles that are cancelled
62# before cleanup of cancelled handles is performed.
63_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
66_HAS_IPv6 = hasattr(socket, 'AF_INET6')
68# Maximum timeout passed to select to avoid OS limitations
69MAXIMUM_SELECT_TIMEOUT = 24 * 3600
72def _format_handle(handle):
73 cb = handle._callback
74 if isinstance(getattr(cb, '__self__', None), tasks.Task):
75 # format the task
76 return repr(cb.__self__)
77 else:
78 return str(handle)
81def _format_pipe(fd):
82 if fd == subprocess.PIPE:
83 return '<pipe>'
84 elif fd == subprocess.STDOUT:
85 return '<stdout>'
86 else:
87 return repr(fd)
90def _set_reuseport(sock):
91 if not hasattr(socket, 'SO_REUSEPORT'):
92 raise ValueError('reuse_port not supported by socket module')
93 else:
94 try:
95 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
96 except OSError:
97 raise ValueError('reuse_port not supported by socket module, '
98 'SO_REUSEPORT defined but not implemented.')
101def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
102 # Try to skip getaddrinfo if "host" is already an IP. Users might have
103 # handled name resolution in their own code and pass in resolved IPs.
104 if not hasattr(socket, 'inet_pton'):
105 return
107 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
108 host is None:
109 return None
111 if type == socket.SOCK_STREAM:
112 proto = socket.IPPROTO_TCP
113 elif type == socket.SOCK_DGRAM:
114 proto = socket.IPPROTO_UDP
115 else:
116 return None
118 if port is None:
119 port = 0
120 elif isinstance(port, bytes) and port == b'':
121 port = 0
122 elif isinstance(port, str) and port == '':
123 port = 0
124 else:
125 # If port's a service name like "http", don't skip getaddrinfo.
126 try:
127 port = int(port)
128 except (TypeError, ValueError):
129 return None
131 if family == socket.AF_UNSPEC:
132 afs = [socket.AF_INET]
133 if _HAS_IPv6: 133 ↛ 138line 133 didn't jump to line 138 because the condition on line 133 was always true
134 afs.append(socket.AF_INET6)
135 else:
136 afs = [family]
138 if isinstance(host, bytes):
139 host = host.decode('idna')
140 if '%' in host:
141 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
142 # like '::1%lo0'.
143 return None
145 for af in afs:
146 try:
147 socket.inet_pton(af, host)
148 # The host has already been resolved.
149 if _HAS_IPv6 and af == socket.AF_INET6:
150 return af, type, proto, '', (host, port, flowinfo, scopeid)
151 else:
152 return af, type, proto, '', (host, port)
153 except OSError:
154 pass
156 # "host" is not an IP address.
157 return None
160def _interleave_addrinfos(addrinfos, first_address_family_count=1):
161 """Interleave list of addrinfo tuples by family."""
162 # Group addresses by family
163 addrinfos_by_family = collections.OrderedDict()
164 for addr in addrinfos:
165 family = addr[0]
166 if family not in addrinfos_by_family:
167 addrinfos_by_family[family] = []
168 addrinfos_by_family[family].append(addr)
169 addrinfos_lists = list(addrinfos_by_family.values())
171 reordered = []
172 if first_address_family_count > 1:
173 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
174 del addrinfos_lists[0][:first_address_family_count - 1]
175 reordered.extend(
176 a for a in itertools.chain.from_iterable(
177 itertools.zip_longest(*addrinfos_lists)
178 ) if a is not None)
179 return reordered
182def _run_until_complete_cb(fut):
183 if not fut.cancelled():
184 exc = fut.exception()
185 if isinstance(exc, (SystemExit, KeyboardInterrupt)):
186 # Issue #22429: run_forever() already finished, no need to
187 # stop it.
188 return
189 futures._get_loop(fut).stop()
192if hasattr(socket, 'TCP_NODELAY'): 192 ↛ 199line 192 didn't jump to line 199 because the condition on line 192 was always true
193 def _set_nodelay(sock):
194 if (sock.family in {socket.AF_INET, socket.AF_INET6} and
195 sock.type == socket.SOCK_STREAM and
196 sock.proto == socket.IPPROTO_TCP):
197 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
198else:
199 def _set_nodelay(sock):
200 pass
203def _check_ssl_socket(sock):
204 if ssl is not None and isinstance(sock, ssl.SSLSocket): 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true
205 raise TypeError("Socket cannot be of type SSLSocket")
208class _SendfileFallbackProtocol(protocols.Protocol):
209 def __init__(self, transp):
210 if not isinstance(transp, transports._FlowControlMixin): 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true
211 raise TypeError("transport should be _FlowControlMixin instance")
212 self._transport = transp
213 self._proto = transp.get_protocol()
214 self._should_resume_reading = transp.is_reading()
215 self._should_resume_writing = transp._protocol_paused
216 transp.pause_reading()
217 transp.set_protocol(self)
218 if self._should_resume_writing: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 self._write_ready_fut = self._transport._loop.create_future()
220 else:
221 self._write_ready_fut = None
223 async def drain(self):
224 if self._transport.is_closing(): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true
225 raise ConnectionError("Connection closed by peer")
226 fut = self._write_ready_fut
227 if fut is None:
228 return
229 await fut
231 def connection_made(self, transport):
232 raise RuntimeError("Invalid state: "
233 "connection should have been established already.")
235 def connection_lost(self, exc):
236 if self._write_ready_fut is not None: 236 ↛ 244line 236 didn't jump to line 244 because the condition on line 236 was always true
237 # Never happens if peer disconnects after sending the whole content
238 # Thus disconnection is always an exception from user perspective
239 if exc is None: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true
240 self._write_ready_fut.set_exception(
241 ConnectionError("Connection is closed by peer"))
242 else:
243 self._write_ready_fut.set_exception(exc)
244 self._proto.connection_lost(exc)
246 def pause_writing(self):
247 if self._write_ready_fut is not None: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true
248 return
249 self._write_ready_fut = self._transport._loop.create_future()
251 def resume_writing(self):
252 if self._write_ready_fut is None: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true
253 return
254 self._write_ready_fut.set_result(False)
255 self._write_ready_fut = None
257 def data_received(self, data):
258 raise RuntimeError("Invalid state: reading should be paused")
260 def eof_received(self):
261 raise RuntimeError("Invalid state: reading should be paused")
263 async def restore(self):
264 self._transport.set_protocol(self._proto)
265 if self._should_resume_reading: 265 ↛ 267line 265 didn't jump to line 267 because the condition on line 265 was always true
266 self._transport.resume_reading()
267 if self._write_ready_fut is not None:
268 # Cancel the future.
269 # Basically it has no effect because protocol is switched back,
270 # no code should wait for it anymore.
271 self._write_ready_fut.cancel()
272 if self._should_resume_writing: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true
273 self._proto.resume_writing()
276class Server(events.AbstractServer):
278 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
279 ssl_handshake_timeout, ssl_shutdown_timeout=None):
280 self._loop = loop
281 self._sockets = sockets
282 # Weak references so we don't break Transport's ability to
283 # detect abandoned transports
284 self._clients = weakref.WeakSet()
285 self._waiters = []
286 self._protocol_factory = protocol_factory
287 self._backlog = backlog
288 self._ssl_context = ssl_context
289 self._ssl_handshake_timeout = ssl_handshake_timeout
290 self._ssl_shutdown_timeout = ssl_shutdown_timeout
291 self._serving = False
292 self._serving_forever_fut = None
294 def __repr__(self):
295 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
297 def _attach(self, transport):
298 assert self._sockets is not None
299 self._clients.add(transport)
301 def _detach(self, transport):
302 self._clients.discard(transport)
303 if len(self._clients) == 0 and self._sockets is None:
304 self._wakeup()
306 def _wakeup(self):
307 waiters = self._waiters
308 self._waiters = None
309 for waiter in waiters:
310 if not waiter.done(): 310 ↛ 309line 310 didn't jump to line 309 because the condition on line 310 was always true
311 waiter.set_result(None)
313 def _start_serving(self):
314 if self._serving:
315 return
316 self._serving = True
317 for sock in self._sockets:
318 sock.listen(self._backlog)
319 self._loop._start_serving(
320 self._protocol_factory, sock, self._ssl_context,
321 self, self._backlog, self._ssl_handshake_timeout,
322 self._ssl_shutdown_timeout)
324 def get_loop(self):
325 return self._loop
327 def is_serving(self):
328 return self._serving
330 @property
331 def sockets(self):
332 if self._sockets is None:
333 return ()
334 return tuple(trsock.TransportSocket(s) for s in self._sockets)
336 def close(self):
337 sockets = self._sockets
338 if sockets is None:
339 return
340 self._sockets = None
342 for sock in sockets:
343 self._loop._stop_serving(sock)
345 self._serving = False
347 if (self._serving_forever_fut is not None and 347 ↛ 349line 347 didn't jump to line 349 because the condition on line 347 was never true
348 not self._serving_forever_fut.done()):
349 self._serving_forever_fut.cancel()
350 self._serving_forever_fut = None
352 if len(self._clients) == 0:
353 self._wakeup()
355 def close_clients(self):
356 for transport in self._clients.copy():
357 transport.close()
359 def abort_clients(self):
360 for transport in self._clients.copy():
361 transport.abort()
363 async def start_serving(self):
364 self._start_serving()
365 # Skip one loop iteration so that all 'loop.add_reader'
366 # go through.
367 await tasks.sleep(0)
369 async def serve_forever(self):
370 if self._serving_forever_fut is not None: 370 ↛ 371line 370 didn't jump to line 371 because the condition on line 370 was never true
371 raise RuntimeError(
372 f'server {self!r} is already being awaited on serve_forever()')
373 if self._sockets is None:
374 raise RuntimeError(f'server {self!r} is closed')
376 self._start_serving()
377 self._serving_forever_fut = self._loop.create_future()
379 try:
380 await self._serving_forever_fut
381 except exceptions.CancelledError:
382 try:
383 self.close()
384 self.close_clients()
385 await self.wait_closed()
386 finally:
387 raise
388 finally:
389 self._serving_forever_fut = None
391 async def wait_closed(self):
392 """Wait until server is closed and all connections are dropped.
394 - If the server is not closed, wait.
395 - If it is closed, but there are still active connections, wait.
397 Anyone waiting here will be unblocked once both conditions
398 (server is closed and all connections have been dropped)
399 have become true, in either order.
401 Historical note: In 3.11 and before, this was broken, returning
402 immediately if the server was already closed, even if there
403 were still active connections. An attempted fix in 3.12.0 was
404 still broken, returning immediately if the server was still
405 open and there were no active connections. Hopefully in 3.12.1
406 we have it right.
407 """
408 # Waiters are unblocked by self._wakeup(), which is called
409 # from two places: self.close() and self._detach(), but only
410 # when both conditions have become true. To signal that this
411 # has happened, self._wakeup() sets self._waiters to None.
412 if self._waiters is None:
413 return
414 waiter = self._loop.create_future()
415 self._waiters.append(waiter)
416 await waiter
419class BaseEventLoop(events.AbstractEventLoop):
421 def __init__(self):
422 self._timer_cancelled_count = 0
423 self._closed = False
424 self._stopping = False
425 self._ready = collections.deque()
426 self._scheduled = []
427 self._default_executor = None
428 self._internal_fds = 0
429 # Identifier of the thread running the event loop, or None if the
430 # event loop is not running
431 self._thread_id = None
432 self._clock_resolution = time.get_clock_info('monotonic').resolution
433 self._exception_handler = None
434 self.set_debug(coroutines._is_debug_mode())
435 # The preserved state of async generator hooks.
436 self._old_agen_hooks = None
437 # In debug mode, if the execution of a callback or a step of a task
438 # exceed this duration in seconds, the slow callback/task is logged.
439 self.slow_callback_duration = 0.1
440 self._current_handle = None
441 self._task_factory = None
442 self._coroutine_origin_tracking_enabled = False
443 self._coroutine_origin_tracking_saved_depth = None
445 # A weak set of all asynchronous generators that are
446 # being iterated by the loop.
447 self._asyncgens = weakref.WeakSet()
448 # Set to True when `loop.shutdown_asyncgens` is called.
449 self._asyncgens_shutdown_called = False
450 # Set to True when `loop.shutdown_default_executor` is called.
451 self._executor_shutdown_called = False
453 def __repr__(self):
454 return (
455 f'<{self.__class__.__name__} running={self.is_running()} '
456 f'closed={self.is_closed()} debug={self.get_debug()}>'
457 )
459 def create_future(self):
460 """Create a Future object attached to the loop."""
461 return futures.Future(loop=self)
463 def create_task(self, coro, **kwargs):
464 """Schedule or begin executing a coroutine object.
466 Return a task object.
467 """
468 self._check_closed()
469 if self._task_factory is not None:
470 return self._task_factory(self, coro, **kwargs)
472 task = tasks.Task(coro, loop=self, **kwargs)
473 if task._source_traceback:
474 del task._source_traceback[-1]
475 try:
476 return task
477 finally:
478 # gh-128552: prevent a refcycle of
479 # task.exception().__traceback__->BaseEventLoop.create_task->task
480 del task
482 def set_task_factory(self, factory):
483 """Set a task factory that will be used by loop.create_task().
485 If factory is None the default task factory will be set.
487 If factory is a callable, it should have a signature matching
488 '(loop, coro, **kwargs)', where 'loop' will be a reference to the active
489 event loop, 'coro' will be a coroutine object, and **kwargs will be
490 arbitrary keyword arguments that should be passed on to Task.
491 The callable must return a Task.
492 """
493 if factory is not None and not callable(factory):
494 raise TypeError('task factory must be a callable or None')
495 self._task_factory = factory
497 def get_task_factory(self):
498 """Return a task factory, or None if the default one is in use."""
499 return self._task_factory
501 def _make_socket_transport(self, sock, protocol, waiter=None, *,
502 extra=None, server=None):
503 """Create socket transport."""
504 raise NotImplementedError
506 def _make_ssl_transport(
507 self, rawsock, protocol, sslcontext, waiter=None,
508 *, server_side=False, server_hostname=None,
509 extra=None, server=None,
510 ssl_handshake_timeout=None,
511 ssl_shutdown_timeout=None,
512 call_connection_made=True):
513 """Create SSL transport."""
514 raise NotImplementedError
516 def _make_datagram_transport(self, sock, protocol,
517 address=None, waiter=None, extra=None):
518 """Create datagram transport."""
519 raise NotImplementedError
521 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
522 extra=None):
523 """Create read pipe transport."""
524 raise NotImplementedError
526 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
527 extra=None):
528 """Create write pipe transport."""
529 raise NotImplementedError
531 async def _make_subprocess_transport(self, protocol, args, shell,
532 stdin, stdout, stderr, bufsize,
533 extra=None, **kwargs):
534 """Create subprocess transport."""
535 raise NotImplementedError
537 def _write_to_self(self):
538 """Write a byte to self-pipe, to wake up the event loop.
540 This may be called from a different thread.
542 The subclass is responsible for implementing the self-pipe.
543 """
544 raise NotImplementedError
546 def _process_events(self, event_list):
547 """Process selector events."""
548 raise NotImplementedError
550 def _check_closed(self):
551 if self._closed:
552 raise RuntimeError('Event loop is closed')
554 def _check_default_executor(self):
555 if self._executor_shutdown_called: 555 ↛ 556line 555 didn't jump to line 556 because the condition on line 555 was never true
556 raise RuntimeError('Executor shutdown has been called')
558 def _asyncgen_finalizer_hook(self, agen):
559 self._asyncgens.discard(agen)
560 if not self.is_closed(): 560 ↛ exitline 560 didn't return from function '_asyncgen_finalizer_hook' because the condition on line 560 was always true
561 self.call_soon_threadsafe(self.create_task, agen.aclose())
563 def _asyncgen_firstiter_hook(self, agen):
564 if self._asyncgens_shutdown_called: 564 ↛ 565line 564 didn't jump to line 565 because the condition on line 564 was never true
565 warnings.warn(
566 f"asynchronous generator {agen!r} was scheduled after "
567 f"loop.shutdown_asyncgens() call",
568 ResourceWarning, source=self)
570 self._asyncgens.add(agen)
572 async def shutdown_asyncgens(self):
573 """Shutdown all active asynchronous generators."""
574 self._asyncgens_shutdown_called = True
576 if not len(self._asyncgens): 576 ↛ 581line 576 didn't jump to line 581 because the condition on line 576 was always true
577 # If Python version is <3.6 or we don't have any asynchronous
578 # generators alive.
579 return
581 closing_agens = list(self._asyncgens)
582 self._asyncgens.clear()
584 results = await tasks.gather(
585 *[ag.aclose() for ag in closing_agens],
586 return_exceptions=True)
588 for result, agen in zip(results, closing_agens):
589 if isinstance(result, Exception):
590 self.call_exception_handler({
591 'message': f'an error occurred during closing of '
592 f'asynchronous generator {agen!r}',
593 'exception': result,
594 'asyncgen': agen
595 })
597 async def shutdown_default_executor(self, timeout=None):
598 """Schedule the shutdown of the default executor.
600 The timeout parameter specifies the amount of time the executor will
601 be given to finish joining. The default value is None, which means
602 that the executor will be given an unlimited amount of time.
603 """
604 self._executor_shutdown_called = True
605 if self._default_executor is None:
606 return
607 future = self.create_future()
608 thread = threading.Thread(target=self._do_shutdown, args=(future,))
609 thread.start()
610 try:
611 async with timeouts.timeout(timeout):
612 await future
613 except TimeoutError:
614 warnings.warn("The executor did not finishing joining "
615 f"its threads within {timeout} seconds.",
616 RuntimeWarning, stacklevel=2)
617 self._default_executor.shutdown(wait=False)
618 else:
619 thread.join()
621 def _do_shutdown(self, future):
622 try:
623 self._default_executor.shutdown(wait=True)
624 if not self.is_closed(): 624 ↛ exitline 624 didn't return from function '_do_shutdown' because the condition on line 624 was always true
625 self.call_soon_threadsafe(futures._set_result_unless_cancelled,
626 future, None)
627 except Exception as ex:
628 if not self.is_closed() and not future.cancelled():
629 self.call_soon_threadsafe(future.set_exception, ex)
631 def _check_running(self):
632 if self.is_running():
633 raise RuntimeError('This event loop is already running')
634 if events._get_running_loop() is not None:
635 raise RuntimeError(
636 'Cannot run the event loop while another loop is running')
638 def _run_forever_setup(self):
639 """Prepare the run loop to process events.
641 This method exists so that custom event loop subclasses (e.g., event loops
642 that integrate a GUI event loop with Python's event loop) have access to all the
643 loop setup logic.
644 """
645 self._check_closed()
646 self._check_running()
647 self._set_coroutine_origin_tracking(self._debug)
649 self._old_agen_hooks = sys.get_asyncgen_hooks()
650 self._thread_id = threading.get_ident()
651 sys.set_asyncgen_hooks(
652 firstiter=self._asyncgen_firstiter_hook,
653 finalizer=self._asyncgen_finalizer_hook
654 )
656 events._set_running_loop(self)
658 def _run_forever_cleanup(self):
659 """Clean up after an event loop finishes the looping over events.
661 This method exists so that custom event loop subclasses (e.g., event loops
662 that integrate a GUI event loop with Python's event loop) have access to all the
663 loop cleanup logic.
664 """
665 self._stopping = False
666 self._thread_id = None
667 events._set_running_loop(None)
668 self._set_coroutine_origin_tracking(False)
669 # Restore any pre-existing async generator hooks.
670 if self._old_agen_hooks is not None: 670 ↛ exitline 670 didn't return from function '_run_forever_cleanup' because the condition on line 670 was always true
671 sys.set_asyncgen_hooks(*self._old_agen_hooks)
672 self._old_agen_hooks = None
674 def run_forever(self):
675 """Run until stop() is called."""
676 self._run_forever_setup()
677 try:
678 while True:
679 self._run_once()
680 if self._stopping:
681 break
682 finally:
683 self._run_forever_cleanup()
685 def run_until_complete(self, future):
686 """Run until the Future is done.
688 If the argument is a coroutine, it is wrapped in a Task.
690 WARNING: It would be disastrous to call run_until_complete()
691 with the same coroutine twice -- it would wrap it in two
692 different Tasks and that can't be good.
694 Return the Future's result, or raise its exception.
695 """
696 self._check_closed()
697 self._check_running()
699 new_task = not futures.isfuture(future)
700 future = tasks.ensure_future(future, loop=self)
701 if new_task:
702 # An exception is raised if the future didn't complete, so there
703 # is no need to log the "destroy pending task" message
704 future._log_destroy_pending = False
706 future.add_done_callback(_run_until_complete_cb)
707 try:
708 self.run_forever()
709 except:
710 if new_task and future.done() and not future.cancelled():
711 # The coroutine raised a BaseException. Consume the exception
712 # to not log a warning, the caller doesn't have access to the
713 # local task.
714 future.exception()
715 raise
716 finally:
717 future.remove_done_callback(_run_until_complete_cb)
718 if not future.done():
719 raise RuntimeError('Event loop stopped before Future completed.')
721 return future.result()
723 def stop(self):
724 """Stop running the event loop.
726 Every callback already scheduled will still run. This simply informs
727 run_forever to stop looping after a complete iteration.
728 """
729 self._stopping = True
731 def close(self):
732 """Close the event loop.
734 This clears the queues and shuts down the executor,
735 but does not wait for the executor to finish.
737 The event loop must not be running.
738 """
739 if self.is_running(): 739 ↛ 740line 739 didn't jump to line 740 because the condition on line 739 was never true
740 raise RuntimeError("Cannot close a running event loop")
741 if self._closed:
742 return
743 if self._debug:
744 logger.debug("Close %r", self)
745 self._closed = True
746 self._ready.clear()
747 self._scheduled.clear()
748 self._executor_shutdown_called = True
749 executor = self._default_executor
750 if executor is not None:
751 self._default_executor = None
752 executor.shutdown(wait=False)
754 def is_closed(self):
755 """Returns True if the event loop was closed."""
756 return self._closed
758 def __del__(self, _warn=warnings.warn):
759 if not self.is_closed(): 759 ↛ 760line 759 didn't jump to line 760 because the condition on line 759 was never true
760 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
761 if not self.is_running():
762 self.close()
764 def is_running(self):
765 """Returns True if the event loop is running."""
766 return (self._thread_id is not None)
768 def time(self):
769 """Return the time according to the event loop's clock.
771 This is a float expressed in seconds since an epoch, but the
772 epoch, precision, accuracy and drift are unspecified and may
773 differ per event loop.
774 """
775 return time.monotonic()
777 def call_later(self, delay, callback, *args, context=None):
778 """Arrange for a callback to be called at a given time.
780 Return a Handle: an opaque object with a cancel() method that
781 can be used to cancel the call.
783 The delay can be an int or float, expressed in seconds. It is
784 always relative to the current time.
786 Each callback will be called exactly once. If two callbacks
787 are scheduled for exactly the same time, it is undefined which
788 will be called first.
790 Any positional arguments after the callback will be passed to
791 the callback when it is called.
792 """
793 if delay is None:
794 raise TypeError('delay must not be None')
795 timer = self.call_at(self.time() + delay, callback, *args,
796 context=context)
797 if timer._source_traceback:
798 del timer._source_traceback[-1]
799 return timer
801 def call_at(self, when, callback, *args, context=None):
802 """Like call_later(), but uses an absolute time.
804 Absolute time corresponds to the event loop's time() method.
805 """
806 if when is None:
807 raise TypeError("when cannot be None")
808 self._check_closed()
809 if self._debug:
810 self._check_thread()
811 self._check_callback(callback, 'call_at')
812 timer = events.TimerHandle(when, callback, args, self, context)
813 if timer._source_traceback:
814 del timer._source_traceback[-1]
815 heapq.heappush(self._scheduled, timer)
816 timer._scheduled = True
817 return timer
819 def call_soon(self, callback, *args, context=None):
820 """Arrange for a callback to be called as soon as possible.
822 This operates as a FIFO queue: callbacks are called in the
823 order in which they are registered. Each callback will be
824 called exactly once.
826 Any positional arguments after the callback will be passed to
827 the callback when it is called.
828 """
829 self._check_closed()
830 if self._debug:
831 self._check_thread()
832 self._check_callback(callback, 'call_soon')
833 handle = self._call_soon(callback, args, context)
834 if handle._source_traceback:
835 del handle._source_traceback[-1]
836 return handle
838 def _check_callback(self, callback, method):
839 if (coroutines.iscoroutine(callback) or
840 coroutines._iscoroutinefunction(callback)):
841 raise TypeError(
842 f"coroutines cannot be used with {method}()")
843 if not callable(callback):
844 raise TypeError(
845 f'a callable object was expected by {method}(), '
846 f'got {callback!r}')
848 def _call_soon(self, callback, args, context):
849 handle = events.Handle(callback, args, self, context)
850 if handle._source_traceback:
851 del handle._source_traceback[-1]
852 self._ready.append(handle)
853 return handle
855 def _check_thread(self):
856 """Check that the current thread is the thread running the event loop.
858 Non-thread-safe methods of this class make this assumption and will
859 likely behave incorrectly when the assumption is violated.
861 Should only be called when (self._debug == True). The caller is
862 responsible for checking this condition for performance reasons.
863 """
864 if self._thread_id is None:
865 return
866 thread_id = threading.get_ident()
867 if thread_id != self._thread_id:
868 raise RuntimeError(
869 "Non-thread-safe operation invoked on an event loop other "
870 "than the current one")
872 def call_soon_threadsafe(self, callback, *args, context=None):
873 """Like call_soon(), but thread-safe."""
874 self._check_closed()
875 if self._debug:
876 self._check_callback(callback, 'call_soon_threadsafe')
877 handle = events._ThreadSafeHandle(callback, args, self, context)
878 self._ready.append(handle)
879 if handle._source_traceback:
880 del handle._source_traceback[-1]
881 if handle._source_traceback:
882 del handle._source_traceback[-1]
883 self._write_to_self()
884 return handle
886 def run_in_executor(self, executor, func, *args):
887 self._check_closed()
888 if self._debug:
889 self._check_callback(func, 'run_in_executor')
890 if executor is None:
891 executor = self._default_executor
892 # Only check when the default executor is being used
893 self._check_default_executor()
894 if executor is None:
895 executor = concurrent.futures.ThreadPoolExecutor(
896 thread_name_prefix='asyncio'
897 )
898 self._default_executor = executor
899 return futures.wrap_future(
900 executor.submit(func, *args), loop=self)
902 def set_default_executor(self, executor):
903 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
904 raise TypeError('executor must be ThreadPoolExecutor instance')
905 self._default_executor = executor
907 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
908 msg = [f"{host}:{port!r}"]
909 if family:
910 msg.append(f'family={family!r}')
911 if type:
912 msg.append(f'type={type!r}')
913 if proto:
914 msg.append(f'proto={proto!r}')
915 if flags:
916 msg.append(f'flags={flags!r}')
917 msg = ', '.join(msg)
918 logger.debug('Get address info %s', msg)
920 t0 = self.time()
921 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
922 dt = self.time() - t0
924 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
925 if dt >= self.slow_callback_duration:
926 logger.info(msg)
927 else:
928 logger.debug(msg)
929 return addrinfo
931 async def getaddrinfo(self, host, port, *,
932 family=0, type=0, proto=0, flags=0):
933 if self._debug: 933 ↛ 934line 933 didn't jump to line 934 because the condition on line 933 was never true
934 getaddr_func = self._getaddrinfo_debug
935 else:
936 getaddr_func = socket.getaddrinfo
938 return await self.run_in_executor(
939 None, getaddr_func, host, port, family, type, proto, flags)
941 async def getnameinfo(self, sockaddr, flags=0):
942 return await self.run_in_executor(
943 None, socket.getnameinfo, sockaddr, flags)
945 async def sock_sendfile(self, sock, file, offset=0, count=None,
946 *, fallback=True):
947 if self._debug and sock.gettimeout() != 0:
948 raise ValueError("the socket must be non-blocking")
949 _check_ssl_socket(sock)
950 self._check_sendfile_params(sock, file, offset, count)
951 try:
952 return await self._sock_sendfile_native(sock, file,
953 offset, count)
954 except exceptions.SendfileNotAvailableError:
955 if not fallback:
956 raise
957 return await self._sock_sendfile_fallback(sock, file,
958 offset, count)
960 async def _sock_sendfile_native(self, sock, file, offset, count):
961 # NB: sendfile syscall is not supported for SSL sockets and
962 # non-mmap files even if sendfile is supported by OS
963 raise exceptions.SendfileNotAvailableError(
964 f"syscall sendfile is not available for socket {sock!r} "
965 f"and file {file!r} combination")
967 async def _sock_sendfile_fallback(self, sock, file, offset, count):
968 if offset:
969 file.seek(offset)
970 blocksize = (
971 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
972 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
973 )
974 buf = bytearray(blocksize)
975 total_sent = 0
976 try:
977 while True:
978 if count:
979 blocksize = min(count - total_sent, blocksize)
980 if blocksize <= 0:
981 break
982 view = memoryview(buf)[:blocksize]
983 read = await self.run_in_executor(None, file.readinto, view)
984 if not read:
985 break # EOF
986 await self.sock_sendall(sock, view[:read])
987 total_sent += read
988 return total_sent
989 finally:
990 if total_sent > 0 and hasattr(file, 'seek'):
991 file.seek(offset + total_sent)
993 def _check_sendfile_params(self, sock, file, offset, count):
994 if 'b' not in getattr(file, 'mode', 'b'):
995 raise ValueError("file should be opened in binary mode")
996 if not sock.type == socket.SOCK_STREAM:
997 raise ValueError("only SOCK_STREAM type sockets are supported")
998 if count is not None:
999 if not isinstance(count, int):
1000 raise TypeError(
1001 "count must be a positive integer (got {!r})".format(count))
1002 if count <= 0:
1003 raise ValueError(
1004 "count must be a positive integer (got {!r})".format(count))
1005 if not isinstance(offset, int):
1006 raise TypeError(
1007 "offset must be a non-negative integer (got {!r})".format(
1008 offset))
1009 if offset < 0:
1010 raise ValueError(
1011 "offset must be a non-negative integer (got {!r})".format(
1012 offset))
1014 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
1015 """Create, bind and connect one socket."""
1016 my_exceptions = []
1017 exceptions.append(my_exceptions)
1018 family, type_, proto, _, address = addr_info
1019 sock = None
1020 try:
1021 try:
1022 sock = socket.socket(family=family, type=type_, proto=proto)
1023 sock.setblocking(False)
1024 if local_addr_infos is not None:
1025 for lfamily, _, _, _, laddr in local_addr_infos:
1026 # skip local addresses of different family
1027 if lfamily != family:
1028 continue
1029 try:
1030 sock.bind(laddr)
1031 break
1032 except OSError as exc:
1033 msg = (
1034 f'error while attempting to bind on '
1035 f'address {laddr!r}: {str(exc).lower()}'
1036 )
1037 exc = OSError(exc.errno, msg)
1038 my_exceptions.append(exc)
1039 else: # all bind attempts failed
1040 if my_exceptions:
1041 raise my_exceptions.pop()
1042 else:
1043 raise OSError(f"no matching local address with {family=} found")
1044 await self.sock_connect(sock, address)
1045 return sock
1046 except OSError as exc:
1047 my_exceptions.append(exc)
1048 raise
1049 except:
1050 if sock is not None:
1051 try:
1052 sock.close()
1053 except OSError:
1054 # An error when closing a newly created socket is
1055 # not important, but it can overwrite more important
1056 # non-OSError error. So ignore it.
1057 pass
1058 raise
1059 finally:
1060 exceptions = my_exceptions = None
1062 async def create_connection(
1063 self, protocol_factory, host=None, port=None,
1064 *, ssl=None, family=0,
1065 proto=0, flags=0, sock=None,
1066 local_addr=None, server_hostname=None,
1067 ssl_handshake_timeout=None,
1068 ssl_shutdown_timeout=None,
1069 happy_eyeballs_delay=None, interleave=None,
1070 all_errors=False):
1071 """Connect to a TCP server.
1073 Create a streaming transport connection to a given internet host and
1074 port: socket family AF_INET or socket.AF_INET6 depending on host (or
1075 family if specified), socket type SOCK_STREAM. protocol_factory must be
1076 a callable returning a protocol instance.
1078 This method is a coroutine which will try to establish the connection
1079 in the background. When successful, the coroutine returns a
1080 (transport, protocol) pair.
1081 """
1082 if server_hostname is not None and not ssl:
1083 raise ValueError('server_hostname is only meaningful with ssl')
1085 if server_hostname is None and ssl:
1086 # Use host as default for server_hostname. It is an error
1087 # if host is empty or not set, e.g. when an
1088 # already-connected socket was passed or when only a port
1089 # is given. To avoid this error, you can pass
1090 # server_hostname='' -- this will bypass the hostname
1091 # check. (This also means that if host is a numeric
1092 # IP/IPv6 address, we will attempt to verify that exact
1093 # address; this will probably fail, but it is possible to
1094 # create a certificate for a specific IP address, so we
1095 # don't judge it here.)
1096 if not host:
1097 raise ValueError('You must set server_hostname '
1098 'when using ssl without a host')
1099 server_hostname = host
1101 if ssl_handshake_timeout is not None and not ssl:
1102 raise ValueError(
1103 'ssl_handshake_timeout is only meaningful with ssl')
1105 if ssl_shutdown_timeout is not None and not ssl: 1105 ↛ 1106line 1105 didn't jump to line 1106 because the condition on line 1105 was never true
1106 raise ValueError(
1107 'ssl_shutdown_timeout is only meaningful with ssl')
1109 if sock is not None:
1110 _check_ssl_socket(sock)
1112 if happy_eyeballs_delay is not None and interleave is None:
1113 # If using happy eyeballs, default to interleave addresses by family
1114 interleave = 1
1116 if host is not None or port is not None:
1117 if sock is not None:
1118 raise ValueError(
1119 'host/port and sock can not be specified at the same time')
1121 infos = await self._ensure_resolved(
1122 (host, port), family=family,
1123 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
1124 if not infos:
1125 raise OSError('getaddrinfo() returned empty list')
1127 if local_addr is not None:
1128 laddr_infos = await self._ensure_resolved(
1129 local_addr, family=family,
1130 type=socket.SOCK_STREAM, proto=proto,
1131 flags=flags, loop=self)
1132 if not laddr_infos:
1133 raise OSError('getaddrinfo() returned empty list')
1134 else:
1135 laddr_infos = None
1137 if interleave:
1138 infos = _interleave_addrinfos(infos, interleave)
1140 exceptions = []
1141 if happy_eyeballs_delay is None:
1142 # not using happy eyeballs
1143 for addrinfo in infos:
1144 try:
1145 sock = await self._connect_sock(
1146 exceptions, addrinfo, laddr_infos)
1147 break
1148 except OSError:
1149 continue
1150 else: # using happy eyeballs
1151 sock = (await staggered.staggered_race(
1152 (
1153 # can't use functools.partial as it keeps a reference
1154 # to exceptions
1155 lambda addrinfo=addrinfo: self._connect_sock(
1156 exceptions, addrinfo, laddr_infos
1157 )
1158 for addrinfo in infos
1159 ),
1160 happy_eyeballs_delay,
1161 loop=self,
1162 ))[0] # can't use sock, _, _ as it keeks a reference to exceptions
1164 if sock is None:
1165 exceptions = [exc for sub in exceptions for exc in sub]
1166 try:
1167 if all_errors:
1168 raise ExceptionGroup("create_connection failed", exceptions)
1169 if len(exceptions) == 1:
1170 raise exceptions[0]
1171 elif exceptions:
1172 # If they all have the same str(), raise one.
1173 model = str(exceptions[0])
1174 if all(str(exc) == model for exc in exceptions):
1175 raise exceptions[0]
1176 # Raise a combined exception so the user can see all
1177 # the various error messages.
1178 raise OSError('Multiple exceptions: {}'.format(
1179 ', '.join(str(exc) for exc in exceptions)))
1180 else:
1181 # No exceptions were collected, raise a timeout error
1182 raise TimeoutError('create_connection failed')
1183 finally:
1184 exceptions = None
1186 else:
1187 if sock is None:
1188 raise ValueError(
1189 'host and port was not specified and no sock specified')
1190 if sock.type != socket.SOCK_STREAM:
1191 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1192 # are SOCK_STREAM.
1193 # We support passing AF_UNIX sockets even though we have
1194 # a dedicated API for that: create_unix_connection.
1195 # Disallowing AF_UNIX in this method, breaks backwards
1196 # compatibility.
1197 raise ValueError(
1198 f'A Stream Socket was expected, got {sock!r}')
1200 transport, protocol = await self._create_connection_transport(
1201 sock, protocol_factory, ssl, server_hostname,
1202 ssl_handshake_timeout=ssl_handshake_timeout,
1203 ssl_shutdown_timeout=ssl_shutdown_timeout)
1204 if self._debug:
1205 # Get the socket from the transport because SSL transport closes
1206 # the old socket and creates a new SSL socket
1207 sock = transport.get_extra_info('socket')
1208 logger.debug("%r connected to %s:%r: (%r, %r)",
1209 sock, host, port, transport, protocol)
1210 return transport, protocol
1212 async def _create_connection_transport(
1213 self, sock, protocol_factory, ssl,
1214 server_hostname, server_side=False,
1215 ssl_handshake_timeout=None,
1216 ssl_shutdown_timeout=None):
1218 sock.setblocking(False)
1220 protocol = protocol_factory()
1221 waiter = self.create_future()
1222 if ssl:
1223 sslcontext = None if isinstance(ssl, bool) else ssl
1224 transport = self._make_ssl_transport(
1225 sock, protocol, sslcontext, waiter,
1226 server_side=server_side, server_hostname=server_hostname,
1227 ssl_handshake_timeout=ssl_handshake_timeout,
1228 ssl_shutdown_timeout=ssl_shutdown_timeout)
1229 else:
1230 transport = self._make_socket_transport(sock, protocol, waiter)
1232 try:
1233 await waiter
1234 except:
1235 transport.close()
1236 raise
1238 return transport, protocol
1240 async def sendfile(self, transport, file, offset=0, count=None,
1241 *, fallback=True):
1242 """Send a file to transport.
1244 Return the total number of bytes which were sent.
1246 The method uses high-performance os.sendfile if available.
1248 file must be a regular file object opened in binary mode.
1250 offset tells from where to start reading the file. If specified,
1251 count is the total number of bytes to transmit as opposed to
1252 sending the file until EOF is reached. File position is updated on
1253 return or also in case of error in which case file.tell()
1254 can be used to figure out the number of bytes
1255 which were sent.
1257 fallback set to True makes asyncio to manually read and send
1258 the file when the platform does not support the sendfile syscall
1259 (e.g. Windows or SSL socket on Unix).
1261 Raise SendfileNotAvailableError if the system does not support
1262 sendfile syscall and fallback is False.
1263 """
1264 if transport.is_closing():
1265 raise RuntimeError("Transport is closing")
1266 mode = getattr(transport, '_sendfile_compatible',
1267 constants._SendfileMode.UNSUPPORTED)
1268 if mode is constants._SendfileMode.UNSUPPORTED:
1269 raise RuntimeError(
1270 f"sendfile is not supported for transport {transport!r}")
1271 if mode is constants._SendfileMode.TRY_NATIVE:
1272 try:
1273 return await self._sendfile_native(transport, file,
1274 offset, count)
1275 except exceptions.SendfileNotAvailableError:
1276 if not fallback:
1277 raise
1279 if not fallback:
1280 raise RuntimeError(
1281 f"fallback is disabled and native sendfile is not "
1282 f"supported for transport {transport!r}")
1284 return await self._sendfile_fallback(transport, file,
1285 offset, count)
1287 async def _sendfile_native(self, transp, file, offset, count):
1288 raise exceptions.SendfileNotAvailableError(
1289 "sendfile syscall is not supported")
1291 async def _sendfile_fallback(self, transp, file, offset, count):
1292 if offset:
1293 file.seek(offset)
1294 blocksize = min(count, 16384) if count else 16384
1295 buf = bytearray(blocksize)
1296 total_sent = 0
1297 proto = _SendfileFallbackProtocol(transp)
1298 try:
1299 while True:
1300 if count:
1301 blocksize = min(count - total_sent, blocksize)
1302 if blocksize <= 0:
1303 return total_sent
1304 view = memoryview(buf)[:blocksize]
1305 read = await self.run_in_executor(None, file.readinto, view)
1306 if not read:
1307 return total_sent # EOF
1308 transp.write(view[:read])
1309 await proto.drain()
1310 total_sent += read
1311 finally:
1312 if total_sent > 0 and hasattr(file, 'seek'):
1313 file.seek(offset + total_sent)
1314 await proto.restore()
1316 async def start_tls(self, transport, protocol, sslcontext, *,
1317 server_side=False,
1318 server_hostname=None,
1319 ssl_handshake_timeout=None,
1320 ssl_shutdown_timeout=None):
1321 """Upgrade transport to TLS.
1323 Return a new transport that *protocol* should start using
1324 immediately.
1325 """
1326 if ssl is None: 1326 ↛ 1327line 1326 didn't jump to line 1327 because the condition on line 1326 was never true
1327 raise RuntimeError('Python ssl module is not available')
1329 if not isinstance(sslcontext, ssl.SSLContext):
1330 raise TypeError(
1331 f'sslcontext is expected to be an instance of ssl.SSLContext, '
1332 f'got {sslcontext!r}')
1334 if not getattr(transport, '_start_tls_compatible', False):
1335 raise TypeError(
1336 f'transport {transport!r} is not supported by start_tls()')
1338 waiter = self.create_future()
1339 ssl_protocol = sslproto.SSLProtocol(
1340 self, protocol, sslcontext, waiter,
1341 server_side, server_hostname,
1342 ssl_handshake_timeout=ssl_handshake_timeout,
1343 ssl_shutdown_timeout=ssl_shutdown_timeout,
1344 call_connection_made=False)
1346 # Pause early so that "ssl_protocol.data_received()" doesn't
1347 # have a chance to get called before "ssl_protocol.connection_made()".
1348 transport.pause_reading()
1350 # gh-142352: move buffered StreamReader data to SSLProtocol
1351 if server_side:
1352 from .streams import StreamReaderProtocol
1353 if isinstance(protocol, StreamReaderProtocol):
1354 stream_reader = getattr(protocol, '_stream_reader', None)
1355 if stream_reader is not None: 1355 ↛ 1361line 1355 didn't jump to line 1361 because the condition on line 1355 was always true
1356 buffer = stream_reader._buffer
1357 if buffer:
1358 ssl_protocol._incoming.write(buffer)
1359 buffer.clear()
1361 transport.set_protocol(ssl_protocol)
1362 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1363 resume_cb = self.call_soon(transport.resume_reading)
1365 try:
1366 await waiter
1367 except BaseException:
1368 transport.close()
1369 conmade_cb.cancel()
1370 resume_cb.cancel()
1371 raise
1373 return ssl_protocol._app_transport
1375 async def create_datagram_endpoint(self, protocol_factory,
1376 local_addr=None, remote_addr=None, *,
1377 family=0, proto=0, flags=0,
1378 reuse_port=None,
1379 allow_broadcast=None, sock=None):
1380 """Create datagram connection."""
1381 if sock is not None:
1382 if sock.type == socket.SOCK_STREAM:
1383 raise ValueError(
1384 f'A datagram socket was expected, got {sock!r}')
1385 if (local_addr or remote_addr or
1386 family or proto or flags or
1387 reuse_port or allow_broadcast):
1388 # show the problematic kwargs in exception msg
1389 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1390 family=family, proto=proto, flags=flags,
1391 reuse_port=reuse_port,
1392 allow_broadcast=allow_broadcast)
1393 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
1394 raise ValueError(
1395 f'socket modifier keyword arguments can not be used '
1396 f'when sock is specified. ({problems})')
1397 sock.setblocking(False)
1398 r_addr = None
1399 else:
1400 if not (local_addr or remote_addr):
1401 if family == 0:
1402 raise ValueError('unexpected address family')
1403 addr_pairs_info = (((family, proto), (None, None)),)
1404 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1405 for addr in (local_addr, remote_addr):
1406 if addr is not None and not isinstance(addr, str): 1406 ↛ 1407line 1406 didn't jump to line 1407 because the condition on line 1406 was never true
1407 raise TypeError('string is expected')
1409 if local_addr and local_addr[0] not in (0, '\x00'): 1409 ↛ 1421line 1409 didn't jump to line 1421 because the condition on line 1409 was always true
1410 try:
1411 if stat.S_ISSOCK(os.stat(local_addr).st_mode): 1411 ↛ 1421line 1411 didn't jump to line 1421 because the condition on line 1411 was always true
1412 os.remove(local_addr)
1413 except FileNotFoundError:
1414 pass
1415 except OSError as err:
1416 # Directory may have permissions only to create socket.
1417 logger.error('Unable to check or remove stale UNIX '
1418 'socket %r: %r',
1419 local_addr, err)
1421 addr_pairs_info = (((family, proto),
1422 (local_addr, remote_addr)), )
1423 else:
1424 # join address by (family, protocol)
1425 addr_infos = {} # Using order preserving dict
1426 for idx, addr in ((0, local_addr), (1, remote_addr)):
1427 if addr is not None:
1428 if not (isinstance(addr, tuple) and len(addr) == 2):
1429 raise TypeError('2-tuple is expected')
1431 infos = await self._ensure_resolved(
1432 addr, family=family, type=socket.SOCK_DGRAM,
1433 proto=proto, flags=flags, loop=self)
1434 if not infos:
1435 raise OSError('getaddrinfo() returned empty list')
1437 for fam, _, pro, _, address in infos:
1438 key = (fam, pro)
1439 if key not in addr_infos: 1439 ↛ 1441line 1439 didn't jump to line 1441 because the condition on line 1439 was always true
1440 addr_infos[key] = [None, None]
1441 addr_infos[key][idx] = address
1443 # each addr has to have info for each (family, proto) pair
1444 addr_pairs_info = [
1445 (key, addr_pair) for key, addr_pair in addr_infos.items()
1446 if not ((local_addr and addr_pair[0] is None) or
1447 (remote_addr and addr_pair[1] is None))]
1449 if not addr_pairs_info:
1450 raise ValueError('can not get address information')
1452 exceptions = []
1454 for ((family, proto),
1455 (local_address, remote_address)) in addr_pairs_info:
1456 sock = None
1457 r_addr = None
1458 try:
1459 sock = socket.socket(
1460 family=family, type=socket.SOCK_DGRAM, proto=proto)
1461 if reuse_port:
1462 _set_reuseport(sock)
1463 if allow_broadcast:
1464 sock.setsockopt(
1465 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1466 sock.setblocking(False)
1468 if local_addr:
1469 sock.bind(local_address)
1470 if remote_addr:
1471 if not allow_broadcast:
1472 await self.sock_connect(sock, remote_address)
1473 r_addr = remote_address
1474 except OSError as exc:
1475 if sock is not None:
1476 sock.close()
1477 exceptions.append(exc)
1478 except:
1479 if sock is not None: 1479 ↛ 1481line 1479 didn't jump to line 1481 because the condition on line 1479 was always true
1480 sock.close()
1481 raise
1482 else:
1483 break
1484 else:
1485 raise exceptions[0]
1487 protocol = protocol_factory()
1488 waiter = self.create_future()
1489 transport = self._make_datagram_transport(
1490 sock, protocol, r_addr, waiter)
1491 if self._debug: 1491 ↛ 1492line 1491 didn't jump to line 1492 because the condition on line 1491 was never true
1492 if local_addr:
1493 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1494 "created: (%r, %r)",
1495 local_addr, remote_addr, transport, protocol)
1496 else:
1497 logger.debug("Datagram endpoint remote_addr=%r created: "
1498 "(%r, %r)",
1499 remote_addr, transport, protocol)
1501 try:
1502 await waiter
1503 except:
1504 transport.close()
1505 raise
1507 return transport, protocol
1509 async def _ensure_resolved(self, address, *,
1510 family=0, type=socket.SOCK_STREAM,
1511 proto=0, flags=0, loop):
1512 host, port = address[:2]
1513 info = _ipaddr_info(host, port, family, type, proto, *address[2:])
1514 if info is not None:
1515 # "host" is already a resolved IP.
1516 return [info]
1517 else:
1518 return await loop.getaddrinfo(host, port, family=family, type=type,
1519 proto=proto, flags=flags)
1521 async def _create_server_getaddrinfo(self, host, port, family, flags):
1522 infos = await self._ensure_resolved((host, port), family=family,
1523 type=socket.SOCK_STREAM,
1524 flags=flags, loop=self)
1525 if not infos:
1526 raise OSError(f'getaddrinfo({host!r}) returned empty list')
1527 return infos
1529 async def create_server(
1530 self, protocol_factory, host=None, port=None,
1531 *,
1532 family=socket.AF_UNSPEC,
1533 flags=socket.AI_PASSIVE,
1534 sock=None,
1535 backlog=100,
1536 ssl=None,
1537 reuse_address=None,
1538 reuse_port=None,
1539 keep_alive=None,
1540 ssl_handshake_timeout=None,
1541 ssl_shutdown_timeout=None,
1542 start_serving=True):
1543 """Create a TCP server.
1545 The host parameter can be a string, in that case the TCP server is
1546 bound to host and port.
1548 The host parameter can also be a sequence of strings and in that case
1549 the TCP server is bound to all hosts of the sequence. If a host
1550 appears multiple times (possibly indirectly e.g. when hostnames
1551 resolve to the same IP address), the server is only bound once to that
1552 host.
1554 Return a Server object which can be used to stop the service.
1556 This method is a coroutine.
1557 """
1558 if isinstance(ssl, bool): 1558 ↛ 1559line 1558 didn't jump to line 1559 because the condition on line 1558 was never true
1559 raise TypeError('ssl argument must be an SSLContext or None')
1561 if ssl_handshake_timeout is not None and ssl is None:
1562 raise ValueError(
1563 'ssl_handshake_timeout is only meaningful with ssl')
1565 if ssl_shutdown_timeout is not None and ssl is None: 1565 ↛ 1566line 1565 didn't jump to line 1566 because the condition on line 1565 was never true
1566 raise ValueError(
1567 'ssl_shutdown_timeout is only meaningful with ssl')
1569 if sock is not None:
1570 _check_ssl_socket(sock)
1572 if host is not None or port is not None:
1573 if sock is not None:
1574 raise ValueError(
1575 'host/port and sock can not be specified at the same time')
1577 if reuse_address is None: 1577 ↛ 1579line 1577 didn't jump to line 1579 because the condition on line 1577 was always true
1578 reuse_address = os.name == "posix" and sys.platform != "cygwin"
1579 sockets = []
1580 if host == '':
1581 hosts = [None]
1582 elif (isinstance(host, str) or
1583 not isinstance(host, collections.abc.Iterable)):
1584 hosts = [host]
1585 else:
1586 hosts = host
1588 fs = [self._create_server_getaddrinfo(host, port, family=family,
1589 flags=flags)
1590 for host in hosts]
1591 infos = await tasks.gather(*fs)
1592 infos = set(itertools.chain.from_iterable(infos))
1594 completed = False
1595 try:
1596 for res in infos:
1597 af, socktype, proto, canonname, sa = res
1598 try:
1599 sock = socket.socket(af, socktype, proto)
1600 except socket.error:
1601 # Assume it's a bad family/type/protocol combination.
1602 if self._debug:
1603 logger.warning('create_server() failed to create '
1604 'socket.socket(%r, %r, %r)',
1605 af, socktype, proto, exc_info=True)
1606 continue
1607 sockets.append(sock)
1608 if reuse_address: 1608 ↛ 1613line 1608 didn't jump to line 1613 because the condition on line 1608 was always true
1609 sock.setsockopt(
1610 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1611 # Since Linux 6.12.9, SO_REUSEPORT is not allowed
1612 # on other address families than AF_INET/AF_INET6.
1613 if reuse_port and af in (socket.AF_INET, socket.AF_INET6):
1614 _set_reuseport(sock)
1615 if keep_alive: 1615 ↛ 1616line 1615 didn't jump to line 1616 because the condition on line 1615 was never true
1616 sock.setsockopt(
1617 socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)
1618 # Disable IPv4/IPv6 dual stack support (enabled by
1619 # default on Linux) which makes a single socket
1620 # listen on both address families.
1621 if (_HAS_IPv6 and
1622 af == socket.AF_INET6 and
1623 hasattr(socket, 'IPPROTO_IPV6')):
1624 sock.setsockopt(socket.IPPROTO_IPV6,
1625 socket.IPV6_V6ONLY,
1626 True)
1627 try:
1628 sock.bind(sa)
1629 except OSError as err:
1630 msg = ('error while attempting '
1631 'to bind on address %r: %s'
1632 % (sa, str(err).lower()))
1633 if err.errno == errno.EADDRNOTAVAIL: 1633 ↛ 1635line 1633 didn't jump to line 1635 because the condition on line 1633 was never true
1634 # Assume the family is not enabled (bpo-30945)
1635 sockets.pop()
1636 sock.close()
1637 if self._debug:
1638 logger.warning(msg)
1639 continue
1640 raise OSError(err.errno, msg) from None
1642 if not sockets: 1642 ↛ 1643line 1642 didn't jump to line 1643 because the condition on line 1642 was never true
1643 raise OSError('could not bind on any address out of %r'
1644 % ([info[4] for info in infos],))
1646 completed = True
1647 finally:
1648 if not completed:
1649 for sock in sockets: 1649 ↛ 1658line 1649 didn't jump to line 1658 because the loop on line 1649 didn't complete
1650 sock.close()
1651 else:
1652 if sock is None:
1653 raise ValueError('Neither host/port nor sock were specified')
1654 if sock.type != socket.SOCK_STREAM:
1655 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1656 sockets = [sock]
1658 for sock in sockets:
1659 sock.setblocking(False)
1661 server = Server(self, sockets, protocol_factory,
1662 ssl, backlog, ssl_handshake_timeout,
1663 ssl_shutdown_timeout)
1664 if start_serving:
1665 server._start_serving()
1666 # Skip one loop iteration so that all 'loop.add_reader'
1667 # go through.
1668 await tasks.sleep(0)
1670 if self._debug:
1671 logger.info("%r is serving", server)
1672 return server
1674 async def connect_accepted_socket(
1675 self, protocol_factory, sock,
1676 *, ssl=None,
1677 ssl_handshake_timeout=None,
1678 ssl_shutdown_timeout=None):
1679 if sock.type != socket.SOCK_STREAM: 1679 ↛ 1680line 1679 didn't jump to line 1680 because the condition on line 1679 was never true
1680 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1682 if ssl_handshake_timeout is not None and not ssl:
1683 raise ValueError(
1684 'ssl_handshake_timeout is only meaningful with ssl')
1686 if ssl_shutdown_timeout is not None and not ssl: 1686 ↛ 1687line 1686 didn't jump to line 1687 because the condition on line 1686 was never true
1687 raise ValueError(
1688 'ssl_shutdown_timeout is only meaningful with ssl')
1690 _check_ssl_socket(sock)
1692 transport, protocol = await self._create_connection_transport(
1693 sock, protocol_factory, ssl, '', server_side=True,
1694 ssl_handshake_timeout=ssl_handshake_timeout,
1695 ssl_shutdown_timeout=ssl_shutdown_timeout)
1696 if self._debug: 1696 ↛ 1699line 1696 didn't jump to line 1699 because the condition on line 1696 was never true
1697 # Get the socket from the transport because SSL transport closes
1698 # the old socket and creates a new SSL socket
1699 sock = transport.get_extra_info('socket')
1700 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1701 return transport, protocol
1703 async def connect_read_pipe(self, protocol_factory, pipe):
1704 protocol = protocol_factory()
1705 waiter = self.create_future()
1706 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
1708 try:
1709 await waiter
1710 except:
1711 transport.close()
1712 raise
1714 if self._debug: 1714 ↛ 1715line 1714 didn't jump to line 1715 because the condition on line 1714 was never true
1715 logger.debug('Read pipe %r connected: (%r, %r)',
1716 pipe.fileno(), transport, protocol)
1717 return transport, protocol
1719 async def connect_write_pipe(self, protocol_factory, pipe):
1720 protocol = protocol_factory()
1721 waiter = self.create_future()
1722 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
1724 try:
1725 await waiter
1726 except:
1727 transport.close()
1728 raise
1730 if self._debug: 1730 ↛ 1731line 1730 didn't jump to line 1731 because the condition on line 1730 was never true
1731 logger.debug('Write pipe %r connected: (%r, %r)',
1732 pipe.fileno(), transport, protocol)
1733 return transport, protocol
1735 def _log_subprocess(self, msg, stdin, stdout, stderr):
1736 info = [msg]
1737 if stdin is not None:
1738 info.append(f'stdin={_format_pipe(stdin)}')
1739 if stdout is not None and stderr == subprocess.STDOUT:
1740 info.append(f'stdout=stderr={_format_pipe(stdout)}')
1741 else:
1742 if stdout is not None:
1743 info.append(f'stdout={_format_pipe(stdout)}')
1744 if stderr is not None:
1745 info.append(f'stderr={_format_pipe(stderr)}')
1746 logger.debug(' '.join(info))
1748 async def subprocess_shell(self, protocol_factory, cmd, *,
1749 stdin=subprocess.PIPE,
1750 stdout=subprocess.PIPE,
1751 stderr=subprocess.PIPE,
1752 universal_newlines=False,
1753 shell=True, bufsize=0,
1754 encoding=None, errors=None, text=None,
1755 **kwargs):
1756 if not isinstance(cmd, (bytes, str)):
1757 raise ValueError("cmd must be a string")
1758 if universal_newlines:
1759 raise ValueError("universal_newlines must be False")
1760 if not shell:
1761 raise ValueError("shell must be True")
1762 if bufsize != 0:
1763 raise ValueError("bufsize must be 0")
1764 if text:
1765 raise ValueError("text must be False")
1766 if encoding is not None:
1767 raise ValueError("encoding must be None")
1768 if errors is not None:
1769 raise ValueError("errors must be None")
1771 protocol = protocol_factory()
1772 debug_log = None
1773 if self._debug: 1773 ↛ 1776line 1773 didn't jump to line 1776 because the condition on line 1773 was never true
1774 # don't log parameters: they may contain sensitive information
1775 # (password) and may be too long
1776 debug_log = 'run shell command %r' % cmd
1777 self._log_subprocess(debug_log, stdin, stdout, stderr)
1778 transport = await self._make_subprocess_transport(
1779 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
1780 if self._debug and debug_log is not None: 1780 ↛ 1781line 1780 didn't jump to line 1781 because the condition on line 1780 was never true
1781 logger.info('%s: %r', debug_log, transport)
1782 return transport, protocol
1784 async def subprocess_exec(self, protocol_factory, program, *args,
1785 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1786 stderr=subprocess.PIPE, universal_newlines=False,
1787 shell=False, bufsize=0,
1788 encoding=None, errors=None, text=None,
1789 **kwargs):
1790 if universal_newlines:
1791 raise ValueError("universal_newlines must be False")
1792 if shell:
1793 raise ValueError("shell must be False")
1794 if bufsize != 0:
1795 raise ValueError("bufsize must be 0")
1796 if text:
1797 raise ValueError("text must be False")
1798 if encoding is not None:
1799 raise ValueError("encoding must be None")
1800 if errors is not None:
1801 raise ValueError("errors must be None")
1803 popen_args = (program,) + args
1804 protocol = protocol_factory()
1805 debug_log = None
1806 if self._debug: 1806 ↛ 1809line 1806 didn't jump to line 1809 because the condition on line 1806 was never true
1807 # don't log parameters: they may contain sensitive information
1808 # (password) and may be too long
1809 debug_log = f'execute program {program!r}'
1810 self._log_subprocess(debug_log, stdin, stdout, stderr)
1811 transport = await self._make_subprocess_transport(
1812 protocol, popen_args, False, stdin, stdout, stderr,
1813 bufsize, **kwargs)
1814 if self._debug and debug_log is not None: 1814 ↛ 1815line 1814 didn't jump to line 1815 because the condition on line 1814 was never true
1815 logger.info('%s: %r', debug_log, transport)
1816 return transport, protocol
1818 def get_exception_handler(self):
1819 """Return an exception handler, or None if the default one is in use.
1820 """
1821 return self._exception_handler
1823 def set_exception_handler(self, handler):
1824 """Set handler as the new event loop exception handler.
1826 If handler is None, the default exception handler will
1827 be set.
1829 If handler is a callable object, it should have a
1830 signature matching '(loop, context)', where 'loop'
1831 will be a reference to the active event loop, 'context'
1832 will be a dict object (see `call_exception_handler()`
1833 documentation for details about context).
1834 """
1835 if handler is not None and not callable(handler):
1836 raise TypeError(f'A callable object or None is expected, '
1837 f'got {handler!r}')
1838 self._exception_handler = handler
1840 def default_exception_handler(self, context):
1841 """Default exception handler.
1843 This is called when an exception occurs and no exception
1844 handler is set, and can be called by a custom exception
1845 handler that wants to defer to the default behavior.
1847 This default handler logs the error message and other
1848 context-dependent information. In debug mode, a truncated
1849 stack trace is also appended showing where the given object
1850 (e.g. a handle or future or task) was created, if any.
1852 The context parameter has the same meaning as in
1853 `call_exception_handler()`.
1854 """
1855 message = context.get('message')
1856 if not message: 1856 ↛ 1857line 1856 didn't jump to line 1857 because the condition on line 1856 was never true
1857 message = 'Unhandled exception in event loop'
1859 exception = context.get('exception')
1860 if exception is not None:
1861 exc_info = (type(exception), exception, exception.__traceback__)
1862 else:
1863 exc_info = False
1865 if ('source_traceback' not in context and 1865 ↛ 1868line 1865 didn't jump to line 1868 because the condition on line 1865 was never true
1866 self._current_handle is not None and
1867 self._current_handle._source_traceback):
1868 context['handle_traceback'] = \
1869 self._current_handle._source_traceback
1871 log_lines = [message]
1872 for key in sorted(context):
1873 if key in {'message', 'exception'}:
1874 continue
1875 value = context[key]
1876 if key == 'source_traceback':
1877 tb = ''.join(traceback.format_list(value))
1878 value = 'Object created at (most recent call last):\n'
1879 value += tb.rstrip()
1880 elif key == 'handle_traceback': 1880 ↛ 1881line 1880 didn't jump to line 1881 because the condition on line 1880 was never true
1881 tb = ''.join(traceback.format_list(value))
1882 value = 'Handle created at (most recent call last):\n'
1883 value += tb.rstrip()
1884 else:
1885 value = repr(value)
1886 log_lines.append(f'{key}: {value}')
1888 logger.error('\n'.join(log_lines), exc_info=exc_info)
1890 def call_exception_handler(self, context):
1891 """Call the current event loop's exception handler.
1893 The context argument is a dict containing the following keys:
1895 - 'message': Error message;
1896 - 'exception' (optional): Exception object;
1897 - 'future' (optional): Future instance;
1898 - 'task' (optional): Task instance;
1899 - 'handle' (optional): Handle instance;
1900 - 'protocol' (optional): Protocol instance;
1901 - 'transport' (optional): Transport instance;
1902 - 'socket' (optional): Socket instance;
1903 - 'source_traceback' (optional): Traceback of the source;
1904 - 'handle_traceback' (optional): Traceback of the handle;
1905 - 'asyncgen' (optional): Asynchronous generator that caused
1906 the exception.
1908 New keys maybe introduced in the future.
1910 Note: do not overload this method in an event loop subclass.
1911 For custom exception handling, use the
1912 `set_exception_handler()` method.
1913 """
1914 if self._exception_handler is None:
1915 try:
1916 self.default_exception_handler(context)
1917 except (SystemExit, KeyboardInterrupt):
1918 raise
1919 except BaseException:
1920 # Second protection layer for unexpected errors
1921 # in the default implementation, as well as for subclassed
1922 # event loops with overloaded "default_exception_handler".
1923 logger.error('Exception in default exception handler',
1924 exc_info=True)
1925 else:
1926 try:
1927 ctx = None
1928 thing = context.get("task")
1929 if thing is None:
1930 # Even though Futures don't have a context,
1931 # Task is a subclass of Future,
1932 # and sometimes the 'future' key holds a Task.
1933 thing = context.get("future")
1934 if thing is None:
1935 # Handles also have a context.
1936 thing = context.get("handle")
1937 if thing is not None and hasattr(thing, "get_context"):
1938 ctx = thing.get_context()
1939 if ctx is not None and hasattr(ctx, "run"):
1940 ctx.run(self._exception_handler, self, context)
1941 else:
1942 self._exception_handler(self, context)
1943 except (SystemExit, KeyboardInterrupt):
1944 raise
1945 except BaseException as exc:
1946 # Exception in the user set custom exception handler.
1947 try:
1948 # Let's try default handler.
1949 self.default_exception_handler({
1950 'message': 'Unhandled error in exception handler',
1951 'exception': exc,
1952 'context': context,
1953 })
1954 except (SystemExit, KeyboardInterrupt):
1955 raise
1956 except BaseException:
1957 # Guard 'default_exception_handler' in case it is
1958 # overloaded.
1959 logger.error('Exception in default exception handler '
1960 'while handling an unexpected error '
1961 'in custom exception handler',
1962 exc_info=True)
1964 def _add_callback(self, handle):
1965 """Add a Handle to _ready."""
1966 if not handle._cancelled:
1967 self._ready.append(handle)
1969 def _add_callback_signalsafe(self, handle):
1970 """Like _add_callback() but called from a signal handler."""
1971 self._add_callback(handle)
1972 self._write_to_self()
1974 def _timer_handle_cancelled(self, handle):
1975 """Notification that a TimerHandle has been cancelled."""
1976 if handle._scheduled:
1977 self._timer_cancelled_count += 1
1979 def _run_once(self):
1980 """Run one full iteration of the event loop.
1982 This calls all currently ready callbacks, polls for I/O,
1983 schedules the resulting callbacks, and finally schedules
1984 'call_later' callbacks.
1985 """
1987 sched_count = len(self._scheduled)
1988 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1989 self._timer_cancelled_count / sched_count >
1990 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1991 # Remove delayed calls that were cancelled if their number
1992 # is too high
1993 new_scheduled = []
1994 for handle in self._scheduled:
1995 if handle._cancelled:
1996 handle._scheduled = False
1997 else:
1998 new_scheduled.append(handle)
2000 heapq.heapify(new_scheduled)
2001 self._scheduled = new_scheduled
2002 self._timer_cancelled_count = 0
2003 else:
2004 # Remove delayed calls that were cancelled from head of queue.
2005 while self._scheduled and self._scheduled[0]._cancelled:
2006 self._timer_cancelled_count -= 1
2007 handle = heapq.heappop(self._scheduled)
2008 handle._scheduled = False
2010 timeout = None
2011 if self._ready or self._stopping:
2012 timeout = 0
2013 elif self._scheduled:
2014 # Compute the desired timeout.
2015 timeout = self._scheduled[0]._when - self.time()
2016 if timeout > MAXIMUM_SELECT_TIMEOUT: 2016 ↛ 2017line 2016 didn't jump to line 2017 because the condition on line 2016 was never true
2017 timeout = MAXIMUM_SELECT_TIMEOUT
2018 elif timeout < 0:
2019 timeout = 0
2021 event_list = self._selector.select(timeout)
2022 self._process_events(event_list)
2023 # Needed to break cycles when an exception occurs.
2024 event_list = None
2026 # Handle 'later' callbacks that are ready.
2027 now = self.time()
2028 # Ensure that `end_time` is strictly increasing
2029 # when the clock resolution is too small.
2030 end_time = now + max(self._clock_resolution, math.ulp(now))
2031 while self._scheduled:
2032 handle = self._scheduled[0]
2033 if handle._when >= end_time:
2034 break
2035 handle = heapq.heappop(self._scheduled)
2036 handle._scheduled = False
2037 self._ready.append(handle)
2039 # This is the only place where callbacks are actually *called*.
2040 # All other places just add them to ready.
2041 # Note: We run all currently scheduled callbacks, but not any
2042 # callbacks scheduled by callbacks run this time around --
2043 # they will be run the next time (after another I/O poll).
2044 # Use an idiom that is thread-safe without using locks.
2045 ntodo = len(self._ready)
2046 for i in range(ntodo):
2047 handle = self._ready.popleft()
2048 if handle._cancelled:
2049 continue
2050 if self._debug:
2051 try:
2052 self._current_handle = handle
2053 t0 = self.time()
2054 handle._run()
2055 dt = self.time() - t0
2056 if dt >= self.slow_callback_duration:
2057 logger.warning('Executing %s took %.3f seconds',
2058 _format_handle(handle), dt)
2059 finally:
2060 self._current_handle = None
2061 else:
2062 handle._run()
2063 handle = None # Needed to break cycles when an exception occurs.
2065 def _set_coroutine_origin_tracking(self, enabled):
2066 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
2067 return
2069 if enabled:
2070 self._coroutine_origin_tracking_saved_depth = (
2071 sys.get_coroutine_origin_tracking_depth())
2072 sys.set_coroutine_origin_tracking_depth(
2073 constants.DEBUG_STACK_DEPTH)
2074 else:
2075 sys.set_coroutine_origin_tracking_depth(
2076 self._coroutine_origin_tracking_saved_depth)
2078 self._coroutine_origin_tracking_enabled = enabled
2080 def get_debug(self):
2081 return self._debug
2083 def set_debug(self, enabled):
2084 self._debug = enabled
2086 if self.is_running():
2087 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)