Coverage for Lib / asyncio / base_events.py: 88%
1167 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 02:39 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 02:39 +0000
1"""Base implementation of event loop.
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of I/O events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
16import collections
17import contextvars
18import collections.abc
19import concurrent.futures
20import errno
21import heapq
22import itertools
23import math
24import os
25import socket
26import stat
27import subprocess
28import sys
29import threading
30import time
31import traceback
32import warnings
33import weakref
35try:
36 import ssl
37except ImportError: # pragma: no cover
38 ssl = None
40from . import constants
41from . import coroutines
42from . import events
43from . import exceptions
44from . import futures
45from . import protocols
46from . import sslproto
47from . import staggered
48from . import tasks
49from . import timeouts
50from . import transports
51from . import trsock
52from .log import logger
55__all__ = 'BaseEventLoop','Server',
58# Minimum number of _scheduled timer handles before cleanup of
59# cancelled handles is performed.
60_MIN_SCHEDULED_TIMER_HANDLES = 100
62# Minimum fraction of _scheduled timer handles that are cancelled
63# before cleanup of cancelled handles is performed.
64_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
67_HAS_IPv6 = hasattr(socket, 'AF_INET6')
69# Maximum timeout passed to select to avoid OS limitations
70MAXIMUM_SELECT_TIMEOUT = 24 * 3600
73def _format_handle(handle):
74 cb = handle._callback
75 if isinstance(getattr(cb, '__self__', None), tasks.Task):
76 # format the task
77 return repr(cb.__self__)
78 else:
79 return str(handle)
82def _format_pipe(fd):
83 if fd == subprocess.PIPE:
84 return '<pipe>'
85 elif fd == subprocess.STDOUT:
86 return '<stdout>'
87 else:
88 return repr(fd)
91def _set_reuseport(sock):
92 if not hasattr(socket, 'SO_REUSEPORT'):
93 raise ValueError('reuse_port not supported by socket module')
94 else:
95 try:
96 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
97 except OSError:
98 raise ValueError('reuse_port not supported by socket module, '
99 'SO_REUSEPORT defined but not implemented.')
102def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
103 # Try to skip getaddrinfo if "host" is already an IP. Users might have
104 # handled name resolution in their own code and pass in resolved IPs.
105 if not hasattr(socket, 'inet_pton'):
106 return
108 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
109 host is None:
110 return None
112 if type == socket.SOCK_STREAM:
113 proto = socket.IPPROTO_TCP
114 elif type == socket.SOCK_DGRAM:
115 proto = socket.IPPROTO_UDP
116 else:
117 return None
119 if port is None:
120 port = 0
121 elif isinstance(port, bytes) and port == b'':
122 port = 0
123 elif isinstance(port, str) and port == '':
124 port = 0
125 else:
126 # If port's a service name like "http", don't skip getaddrinfo.
127 try:
128 port = int(port)
129 except (TypeError, ValueError):
130 return None
132 if family == socket.AF_UNSPEC:
133 afs = [socket.AF_INET]
134 if _HAS_IPv6: 134 ↛ 139line 134 didn't jump to line 139 because the condition on line 134 was always true
135 afs.append(socket.AF_INET6)
136 else:
137 afs = [family]
139 if isinstance(host, bytes):
140 host = host.decode('idna')
141 if '%' in host:
142 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
143 # like '::1%lo0'.
144 return None
146 for af in afs:
147 try:
148 socket.inet_pton(af, host)
149 # The host has already been resolved.
150 if _HAS_IPv6 and af == socket.AF_INET6:
151 return af, type, proto, '', (host, port, flowinfo, scopeid)
152 else:
153 return af, type, proto, '', (host, port)
154 except OSError:
155 pass
157 # "host" is not an IP address.
158 return None
161def _interleave_addrinfos(addrinfos, first_address_family_count=1):
162 """Interleave list of addrinfo tuples by family."""
163 # Group addresses by family
164 addrinfos_by_family = collections.OrderedDict()
165 for addr in addrinfos:
166 family = addr[0]
167 if family not in addrinfos_by_family:
168 addrinfos_by_family[family] = []
169 addrinfos_by_family[family].append(addr)
170 addrinfos_lists = list(addrinfos_by_family.values())
172 reordered = []
173 if first_address_family_count > 1:
174 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
175 del addrinfos_lists[0][:first_address_family_count - 1]
176 reordered.extend(
177 a for a in itertools.chain.from_iterable(
178 itertools.zip_longest(*addrinfos_lists)
179 ) if a is not None)
180 return reordered
183def _run_until_complete_cb(fut):
184 if not fut.cancelled():
185 exc = fut.exception()
186 if isinstance(exc, (SystemExit, KeyboardInterrupt)):
187 # Issue #22429: run_forever() already finished, no need to
188 # stop it.
189 return
190 futures._get_loop(fut).stop()
193if hasattr(socket, 'TCP_NODELAY'): 193 ↛ 200line 193 didn't jump to line 200 because the condition on line 193 was always true
194 def _set_nodelay(sock):
195 if (sock.family in {socket.AF_INET, socket.AF_INET6} and
196 sock.type == socket.SOCK_STREAM and
197 sock.proto == socket.IPPROTO_TCP):
198 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
199else:
200 def _set_nodelay(sock):
201 pass
204def _check_ssl_socket(sock):
205 if ssl is not None and isinstance(sock, ssl.SSLSocket): 205 ↛ 206line 205 didn't jump to line 206 because the condition on line 205 was never true
206 raise TypeError("Socket cannot be of type SSLSocket")
209class _SendfileFallbackProtocol(protocols.Protocol):
210 def __init__(self, transp):
211 if not isinstance(transp, transports._FlowControlMixin): 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 raise TypeError("transport should be _FlowControlMixin instance")
213 self._transport = transp
214 self._proto = transp.get_protocol()
215 self._should_resume_reading = transp.is_reading()
216 self._should_resume_writing = transp._protocol_paused
217 transp.pause_reading()
218 transp.set_protocol(self)
219 if self._should_resume_writing: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true
220 self._write_ready_fut = self._transport._loop.create_future()
221 else:
222 self._write_ready_fut = None
224 async def drain(self):
225 if self._transport.is_closing(): 225 ↛ 226line 225 didn't jump to line 226 because the condition on line 225 was never true
226 raise ConnectionError("Connection closed by peer")
227 fut = self._write_ready_fut
228 if fut is None:
229 return
230 await fut
232 def connection_made(self, transport):
233 raise RuntimeError("Invalid state: "
234 "connection should have been established already.")
236 def connection_lost(self, exc):
237 if self._write_ready_fut is not None: 237 ↛ 245line 237 didn't jump to line 245 because the condition on line 237 was always true
238 # Never happens if peer disconnects after sending the whole content
239 # Thus disconnection is always an exception from user perspective
240 if exc is None: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 self._write_ready_fut.set_exception(
242 ConnectionError("Connection is closed by peer"))
243 else:
244 self._write_ready_fut.set_exception(exc)
245 self._proto.connection_lost(exc)
247 def pause_writing(self):
248 if self._write_ready_fut is not None: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true
249 return
250 self._write_ready_fut = self._transport._loop.create_future()
252 def resume_writing(self):
253 if self._write_ready_fut is None: 253 ↛ 254line 253 didn't jump to line 254 because the condition on line 253 was never true
254 return
255 self._write_ready_fut.set_result(False)
256 self._write_ready_fut = None
258 def data_received(self, data):
259 raise RuntimeError("Invalid state: reading should be paused")
261 def eof_received(self):
262 raise RuntimeError("Invalid state: reading should be paused")
264 async def restore(self):
265 self._transport.set_protocol(self._proto)
266 if self._should_resume_reading: 266 ↛ 268line 266 didn't jump to line 268 because the condition on line 266 was always true
267 self._transport.resume_reading()
268 if self._write_ready_fut is not None:
269 # Cancel the future.
270 # Basically it has no effect because protocol is switched back,
271 # no code should wait for it anymore.
272 self._write_ready_fut.cancel()
273 if self._should_resume_writing: 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true
274 self._proto.resume_writing()
277class Server(events.AbstractServer):
279 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
280 ssl_handshake_timeout, ssl_shutdown_timeout=None):
281 self._loop = loop
282 self._sockets = sockets
283 # Weak references so we don't break Transport's ability to
284 # detect abandoned transports
285 self._clients = weakref.WeakSet()
286 self._waiters = []
287 self._protocol_factory = protocol_factory
288 self._backlog = backlog
289 self._ssl_context = ssl_context
290 self._ssl_handshake_timeout = ssl_handshake_timeout
291 self._ssl_shutdown_timeout = ssl_shutdown_timeout
292 self._serving = False
293 self._serving_forever_fut = None
294 self._context = contextvars.copy_context()
296 def __repr__(self):
297 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
299 def _attach(self, transport):
300 assert self._sockets is not None
301 self._clients.add(transport)
303 def _detach(self, transport):
304 self._clients.discard(transport)
305 if len(self._clients) == 0 and self._sockets is None:
306 self._wakeup()
308 def _wakeup(self):
309 waiters = self._waiters
310 self._waiters = None
311 for waiter in waiters:
312 if not waiter.done(): 312 ↛ 311line 312 didn't jump to line 311 because the condition on line 312 was always true
313 waiter.set_result(None)
315 def _start_serving(self):
316 if self._serving:
317 return
318 self._serving = True
319 for sock in self._sockets:
320 sock.listen(self._backlog)
321 self._loop._start_serving(
322 self._protocol_factory, sock, self._ssl_context,
323 self, self._backlog, self._ssl_handshake_timeout,
324 self._ssl_shutdown_timeout, context=self._context)
326 def get_loop(self):
327 return self._loop
329 def is_serving(self):
330 return self._serving
332 @property
333 def sockets(self):
334 if self._sockets is None:
335 return ()
336 return tuple(trsock.TransportSocket(s) for s in self._sockets)
338 def close(self):
339 sockets = self._sockets
340 if sockets is None:
341 return
342 self._sockets = None
344 for sock in sockets:
345 self._loop._stop_serving(sock)
347 self._serving = False
349 if (self._serving_forever_fut is not None and 349 ↛ 351line 349 didn't jump to line 351 because the condition on line 349 was never true
350 not self._serving_forever_fut.done()):
351 self._serving_forever_fut.cancel()
352 self._serving_forever_fut = None
354 if len(self._clients) == 0:
355 self._wakeup()
357 def close_clients(self):
358 for transport in self._clients.copy():
359 transport.close()
361 def abort_clients(self):
362 for transport in self._clients.copy():
363 transport.abort()
365 async def start_serving(self):
366 self._start_serving()
367 # Skip one loop iteration so that all 'loop.add_reader'
368 # go through.
369 await tasks.sleep(0)
371 async def serve_forever(self):
372 if self._serving_forever_fut is not None: 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true
373 raise RuntimeError(
374 f'server {self!r} is already being awaited on serve_forever()')
375 if self._sockets is None:
376 raise RuntimeError(f'server {self!r} is closed')
378 self._start_serving()
379 self._serving_forever_fut = self._loop.create_future()
381 try:
382 await self._serving_forever_fut
383 except exceptions.CancelledError:
384 try:
385 self.close()
386 self.close_clients()
387 await self.wait_closed()
388 finally:
389 raise
390 finally:
391 self._serving_forever_fut = None
393 async def wait_closed(self):
394 """Wait until server is closed and all connections are dropped.
396 - If the server is not closed, wait.
397 - If it is closed, but there are still active connections, wait.
399 Anyone waiting here will be unblocked once both conditions
400 (server is closed and all connections have been dropped)
401 have become true, in either order.
403 Historical note: In 3.11 and before, this was broken, returning
404 immediately if the server was already closed, even if there
405 were still active connections. An attempted fix in 3.12.0 was
406 still broken, returning immediately if the server was still
407 open and there were no active connections. Hopefully in 3.12.1
408 we have it right.
409 """
410 # Waiters are unblocked by self._wakeup(), which is called
411 # from two places: self.close() and self._detach(), but only
412 # when both conditions have become true. To signal that this
413 # has happened, self._wakeup() sets self._waiters to None.
414 if self._waiters is None:
415 return
416 waiter = self._loop.create_future()
417 self._waiters.append(waiter)
418 await waiter
421class BaseEventLoop(events.AbstractEventLoop):
423 def __init__(self):
424 self._timer_cancelled_count = 0
425 self._closed = False
426 self._stopping = False
427 self._ready = collections.deque()
428 self._scheduled = []
429 self._default_executor = None
430 self._internal_fds = 0
431 # Identifier of the thread running the event loop, or None if the
432 # event loop is not running
433 self._thread_id = None
434 self._clock_resolution = time.get_clock_info('monotonic').resolution
435 self._exception_handler = None
436 self.set_debug(coroutines._is_debug_mode())
437 # The preserved state of async generator hooks.
438 self._old_agen_hooks = None
439 # In debug mode, if the execution of a callback or a step of a task
440 # exceed this duration in seconds, the slow callback/task is logged.
441 self.slow_callback_duration = 0.1
442 self._current_handle = None
443 self._task_factory = None
444 self._coroutine_origin_tracking_enabled = False
445 self._coroutine_origin_tracking_saved_depth = None
447 # A weak set of all asynchronous generators that are
448 # being iterated by the loop.
449 self._asyncgens = weakref.WeakSet()
450 # Set to True when `loop.shutdown_asyncgens` is called.
451 self._asyncgens_shutdown_called = False
452 # Set to True when `loop.shutdown_default_executor` is called.
453 self._executor_shutdown_called = False
455 def __repr__(self):
456 return (
457 f'<{self.__class__.__name__} running={self.is_running()} '
458 f'closed={self.is_closed()} debug={self.get_debug()}>'
459 )
461 def create_future(self):
462 """Create a Future object attached to the loop."""
463 return futures.Future(loop=self)
465 def create_task(self, coro, **kwargs):
466 """Schedule or begin executing a coroutine object.
468 Return a task object.
469 """
470 self._check_closed()
471 if self._task_factory is not None:
472 return self._task_factory(self, coro, **kwargs)
474 task = tasks.Task(coro, loop=self, **kwargs)
475 if task._source_traceback:
476 del task._source_traceback[-1]
477 try:
478 return task
479 finally:
480 # gh-128552: prevent a refcycle of
481 # task.exception().__traceback__->BaseEventLoop.create_task->task
482 del task
484 def set_task_factory(self, factory):
485 """Set a task factory that will be used by loop.create_task().
487 If factory is None the default task factory will be set.
489 If factory is a callable, it should have a signature matching
490 '(loop, coro, **kwargs)', where 'loop' will be a reference to the active
491 event loop, 'coro' will be a coroutine object, and **kwargs will be
492 arbitrary keyword arguments that should be passed on to Task.
493 The callable must return a Task.
494 """
495 if factory is not None and not callable(factory):
496 raise TypeError('task factory must be a callable or None')
497 self._task_factory = factory
499 def get_task_factory(self):
500 """Return a task factory, or None if the default one is in use."""
501 return self._task_factory
503 def _make_socket_transport(self, sock, protocol, waiter=None, *,
504 extra=None, server=None):
505 """Create socket transport."""
506 raise NotImplementedError
508 def _make_ssl_transport(
509 self, rawsock, protocol, sslcontext, waiter=None,
510 *, server_side=False, server_hostname=None,
511 extra=None, server=None,
512 ssl_handshake_timeout=None,
513 ssl_shutdown_timeout=None,
514 call_connection_made=True,
515 context=None):
516 """Create SSL transport."""
517 raise NotImplementedError
519 def _make_datagram_transport(self, sock, protocol,
520 address=None, waiter=None, extra=None):
521 """Create datagram transport."""
522 raise NotImplementedError
524 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
525 extra=None):
526 """Create read pipe transport."""
527 raise NotImplementedError
529 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
530 extra=None):
531 """Create write pipe transport."""
532 raise NotImplementedError
534 async def _make_subprocess_transport(self, protocol, args, shell,
535 stdin, stdout, stderr, bufsize,
536 extra=None, **kwargs):
537 """Create subprocess transport."""
538 raise NotImplementedError
540 def _write_to_self(self):
541 """Write a byte to self-pipe, to wake up the event loop.
543 This may be called from a different thread.
545 The subclass is responsible for implementing the self-pipe.
546 """
547 raise NotImplementedError
549 def _process_events(self, event_list):
550 """Process selector events."""
551 raise NotImplementedError
553 def _check_closed(self):
554 if self._closed:
555 raise RuntimeError('Event loop is closed')
557 def _check_default_executor(self):
558 if self._executor_shutdown_called: 558 ↛ 559line 558 didn't jump to line 559 because the condition on line 558 was never true
559 raise RuntimeError('Executor shutdown has been called')
561 def _asyncgen_finalizer_hook(self, agen):
562 self._asyncgens.discard(agen)
563 if not self.is_closed(): 563 ↛ exitline 563 didn't return from function '_asyncgen_finalizer_hook' because the condition on line 563 was always true
564 self.call_soon_threadsafe(self.create_task, agen.aclose())
566 def _asyncgen_firstiter_hook(self, agen):
567 if self._asyncgens_shutdown_called: 567 ↛ 568line 567 didn't jump to line 568 because the condition on line 567 was never true
568 warnings.warn(
569 f"asynchronous generator {agen!r} was scheduled after "
570 f"loop.shutdown_asyncgens() call",
571 ResourceWarning, source=self)
573 self._asyncgens.add(agen)
575 async def shutdown_asyncgens(self):
576 """Shutdown all active asynchronous generators."""
577 self._asyncgens_shutdown_called = True
579 if not len(self._asyncgens): 579 ↛ 584line 579 didn't jump to line 584 because the condition on line 579 was always true
580 # If Python version is <3.6 or we don't have any asynchronous
581 # generators alive.
582 return
584 closing_agens = list(self._asyncgens)
585 self._asyncgens.clear()
587 results = await tasks.gather(
588 *[ag.aclose() for ag in closing_agens],
589 return_exceptions=True)
591 for result, agen in zip(results, closing_agens):
592 if isinstance(result, Exception):
593 self.call_exception_handler({
594 'message': f'an error occurred during closing of '
595 f'asynchronous generator {agen!r}',
596 'exception': result,
597 'asyncgen': agen
598 })
600 async def shutdown_default_executor(self, timeout=None):
601 """Schedule the shutdown of the default executor.
603 The timeout parameter specifies the amount of time the executor will
604 be given to finish joining. The default value is None, which means
605 that the executor will be given an unlimited amount of time.
606 """
607 self._executor_shutdown_called = True
608 if self._default_executor is None:
609 return
610 future = self.create_future()
611 thread = threading.Thread(target=self._do_shutdown, args=(future,))
612 thread.start()
613 try:
614 async with timeouts.timeout(timeout):
615 await future
616 except TimeoutError:
617 warnings.warn("The executor did not finishing joining "
618 f"its threads within {timeout} seconds.",
619 RuntimeWarning, stacklevel=2)
620 self._default_executor.shutdown(wait=False)
621 else:
622 thread.join()
624 def _do_shutdown(self, future):
625 try:
626 self._default_executor.shutdown(wait=True)
627 if not self.is_closed(): 627 ↛ exitline 627 didn't return from function '_do_shutdown' because the condition on line 627 was always true
628 self.call_soon_threadsafe(futures._set_result_unless_cancelled,
629 future, None)
630 except Exception as ex:
631 if not self.is_closed() and not future.cancelled():
632 self.call_soon_threadsafe(future.set_exception, ex)
634 def _check_running(self):
635 if self.is_running():
636 raise RuntimeError('This event loop is already running')
637 if events._get_running_loop() is not None:
638 raise RuntimeError(
639 'Cannot run the event loop while another loop is running')
641 def _run_forever_setup(self):
642 """Prepare the run loop to process events.
644 This method exists so that custom event loop subclasses (e.g., event loops
645 that integrate a GUI event loop with Python's event loop) have access to all the
646 loop setup logic.
647 """
648 self._check_closed()
649 self._check_running()
650 self._set_coroutine_origin_tracking(self._debug)
652 self._old_agen_hooks = sys.get_asyncgen_hooks()
653 self._thread_id = threading.get_ident()
654 sys.set_asyncgen_hooks(
655 firstiter=self._asyncgen_firstiter_hook,
656 finalizer=self._asyncgen_finalizer_hook
657 )
659 events._set_running_loop(self)
661 def _run_forever_cleanup(self):
662 """Clean up after an event loop finishes the looping over events.
664 This method exists so that custom event loop subclasses (e.g., event loops
665 that integrate a GUI event loop with Python's event loop) have access to all the
666 loop cleanup logic.
667 """
668 self._stopping = False
669 self._thread_id = None
670 events._set_running_loop(None)
671 self._set_coroutine_origin_tracking(False)
672 # Restore any pre-existing async generator hooks.
673 if self._old_agen_hooks is not None: 673 ↛ exitline 673 didn't return from function '_run_forever_cleanup' because the condition on line 673 was always true
674 sys.set_asyncgen_hooks(*self._old_agen_hooks)
675 self._old_agen_hooks = None
677 def run_forever(self):
678 """Run until stop() is called."""
679 self._run_forever_setup()
680 try:
681 while True:
682 self._run_once()
683 if self._stopping:
684 break
685 finally:
686 self._run_forever_cleanup()
688 def run_until_complete(self, future):
689 """Run until the Future is done.
691 If the argument is a coroutine, it is wrapped in a Task.
693 WARNING: It would be disastrous to call run_until_complete()
694 with the same coroutine twice -- it would wrap it in two
695 different Tasks and that can't be good.
697 Return the Future's result, or raise its exception.
698 """
699 self._check_closed()
700 self._check_running()
702 new_task = not futures.isfuture(future)
703 future = tasks.ensure_future(future, loop=self)
704 if new_task:
705 # An exception is raised if the future didn't complete, so there
706 # is no need to log the "destroy pending task" message
707 future._log_destroy_pending = False
709 future.add_done_callback(_run_until_complete_cb)
710 try:
711 self.run_forever()
712 except:
713 if new_task and future.done() and not future.cancelled():
714 # The coroutine raised a BaseException. Consume the exception
715 # to not log a warning, the caller doesn't have access to the
716 # local task.
717 future.exception()
718 raise
719 finally:
720 future.remove_done_callback(_run_until_complete_cb)
721 if not future.done():
722 raise RuntimeError('Event loop stopped before Future completed.')
724 return future.result()
726 def stop(self):
727 """Stop running the event loop.
729 Every callback already scheduled will still run. This simply informs
730 run_forever to stop looping after a complete iteration.
731 """
732 self._stopping = True
734 def close(self):
735 """Close the event loop.
737 This clears the queues and shuts down the executor,
738 but does not wait for the executor to finish.
740 The event loop must not be running.
741 """
742 if self.is_running(): 742 ↛ 743line 742 didn't jump to line 743 because the condition on line 742 was never true
743 raise RuntimeError("Cannot close a running event loop")
744 if self._closed:
745 return
746 if self._debug:
747 logger.debug("Close %r", self)
748 self._closed = True
749 self._ready.clear()
750 self._scheduled.clear()
751 self._executor_shutdown_called = True
752 executor = self._default_executor
753 if executor is not None:
754 self._default_executor = None
755 executor.shutdown(wait=False)
757 def is_closed(self):
758 """Returns True if the event loop was closed."""
759 return self._closed
761 def __del__(self, _warn=warnings.warn):
762 if not self.is_closed(): 762 ↛ 763line 762 didn't jump to line 763 because the condition on line 762 was never true
763 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
764 if not self.is_running():
765 self.close()
767 def is_running(self):
768 """Returns True if the event loop is running."""
769 return (self._thread_id is not None)
771 def time(self):
772 """Return the time according to the event loop's clock.
774 This is a float expressed in seconds since an epoch, but the
775 epoch, precision, accuracy and drift are unspecified and may
776 differ per event loop.
777 """
778 return time.monotonic()
780 def call_later(self, delay, callback, *args, context=None):
781 """Arrange for a callback to be called at a given time.
783 Return a Handle: an opaque object with a cancel() method that
784 can be used to cancel the call.
786 The delay can be an int or float, expressed in seconds. It is
787 always relative to the current time.
789 Each callback will be called exactly once. If two callbacks
790 are scheduled for exactly the same time, it is undefined which
791 will be called first.
793 Any positional arguments after the callback will be passed to
794 the callback when it is called.
795 """
796 if delay is None:
797 raise TypeError('delay must not be None')
798 timer = self.call_at(self.time() + delay, callback, *args,
799 context=context)
800 if timer._source_traceback:
801 del timer._source_traceback[-1]
802 return timer
804 def call_at(self, when, callback, *args, context=None):
805 """Like call_later(), but uses an absolute time.
807 Absolute time corresponds to the event loop's time() method.
808 """
809 if when is None:
810 raise TypeError("when cannot be None")
811 self._check_closed()
812 if self._debug:
813 self._check_thread()
814 self._check_callback(callback, 'call_at')
815 timer = events.TimerHandle(when, callback, args, self, context)
816 if timer._source_traceback:
817 del timer._source_traceback[-1]
818 heapq.heappush(self._scheduled, timer)
819 timer._scheduled = True
820 return timer
822 def call_soon(self, callback, *args, context=None):
823 """Arrange for a callback to be called as soon as possible.
825 This operates as a FIFO queue: callbacks are called in the
826 order in which they are registered. Each callback will be
827 called exactly once.
829 Any positional arguments after the callback will be passed to
830 the callback when it is called.
831 """
832 self._check_closed()
833 if self._debug:
834 self._check_thread()
835 self._check_callback(callback, 'call_soon')
836 handle = self._call_soon(callback, args, context)
837 if handle._source_traceback:
838 del handle._source_traceback[-1]
839 return handle
841 def _check_callback(self, callback, method):
842 if (coroutines.iscoroutine(callback) or
843 coroutines._iscoroutinefunction(callback)):
844 raise TypeError(
845 f"coroutines cannot be used with {method}()")
846 if not callable(callback):
847 raise TypeError(
848 f'a callable object was expected by {method}(), '
849 f'got {callback!r}')
851 def _call_soon(self, callback, args, context):
852 handle = events.Handle(callback, args, self, context)
853 if handle._source_traceback:
854 del handle._source_traceback[-1]
855 self._ready.append(handle)
856 return handle
858 def _check_thread(self):
859 """Check that the current thread is the thread running the event loop.
861 Non-thread-safe methods of this class make this assumption and will
862 likely behave incorrectly when the assumption is violated.
864 Should only be called when (self._debug == True). The caller is
865 responsible for checking this condition for performance reasons.
866 """
867 if self._thread_id is None:
868 return
869 thread_id = threading.get_ident()
870 if thread_id != self._thread_id:
871 raise RuntimeError(
872 "Non-thread-safe operation invoked on an event loop other "
873 "than the current one")
875 def call_soon_threadsafe(self, callback, *args, context=None):
876 """Like call_soon(), but thread-safe."""
877 self._check_closed()
878 if self._debug:
879 self._check_callback(callback, 'call_soon_threadsafe')
880 handle = events._ThreadSafeHandle(callback, args, self, context)
881 self._ready.append(handle)
882 if handle._source_traceback:
883 del handle._source_traceback[-1]
884 if handle._source_traceback:
885 del handle._source_traceback[-1]
886 self._write_to_self()
887 return handle
889 def run_in_executor(self, executor, func, *args):
890 self._check_closed()
891 if self._debug:
892 self._check_callback(func, 'run_in_executor')
893 if executor is None:
894 executor = self._default_executor
895 # Only check when the default executor is being used
896 self._check_default_executor()
897 if executor is None:
898 executor = concurrent.futures.ThreadPoolExecutor(
899 thread_name_prefix='asyncio'
900 )
901 self._default_executor = executor
902 return futures.wrap_future(
903 executor.submit(func, *args), loop=self)
905 def set_default_executor(self, executor):
906 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
907 raise TypeError('executor must be ThreadPoolExecutor instance')
908 self._default_executor = executor
910 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
911 msg = [f"{host}:{port!r}"]
912 if family:
913 msg.append(f'family={family!r}')
914 if type:
915 msg.append(f'type={type!r}')
916 if proto:
917 msg.append(f'proto={proto!r}')
918 if flags:
919 msg.append(f'flags={flags!r}')
920 msg = ', '.join(msg)
921 logger.debug('Get address info %s', msg)
923 t0 = self.time()
924 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
925 dt = self.time() - t0
927 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
928 if dt >= self.slow_callback_duration:
929 logger.info(msg)
930 else:
931 logger.debug(msg)
932 return addrinfo
934 async def getaddrinfo(self, host, port, *,
935 family=0, type=0, proto=0, flags=0):
936 if self._debug: 936 ↛ 937line 936 didn't jump to line 937 because the condition on line 936 was never true
937 getaddr_func = self._getaddrinfo_debug
938 else:
939 getaddr_func = socket.getaddrinfo
941 return await self.run_in_executor(
942 None, getaddr_func, host, port, family, type, proto, flags)
944 async def getnameinfo(self, sockaddr, flags=0):
945 return await self.run_in_executor(
946 None, socket.getnameinfo, sockaddr, flags)
948 async def sock_sendfile(self, sock, file, offset=0, count=None,
949 *, fallback=True):
950 if self._debug and sock.gettimeout() != 0:
951 raise ValueError("the socket must be non-blocking")
952 _check_ssl_socket(sock)
953 self._check_sendfile_params(sock, file, offset, count)
954 try:
955 return await self._sock_sendfile_native(sock, file,
956 offset, count)
957 except exceptions.SendfileNotAvailableError:
958 if not fallback:
959 raise
960 return await self._sock_sendfile_fallback(sock, file,
961 offset, count)
963 async def _sock_sendfile_native(self, sock, file, offset, count):
964 # NB: sendfile syscall is not supported for SSL sockets and
965 # non-mmap files even if sendfile is supported by OS
966 raise exceptions.SendfileNotAvailableError(
967 f"syscall sendfile is not available for socket {sock!r} "
968 f"and file {file!r} combination")
970 async def _sock_sendfile_fallback(self, sock, file, offset, count):
971 if offset:
972 file.seek(offset)
973 blocksize = (
974 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
975 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
976 )
977 buf = bytearray(blocksize)
978 total_sent = 0
979 try:
980 while True:
981 if count:
982 blocksize = min(count - total_sent, blocksize)
983 if blocksize <= 0:
984 break
985 view = memoryview(buf)[:blocksize]
986 read = await self.run_in_executor(None, file.readinto, view)
987 if not read:
988 break # EOF
989 await self.sock_sendall(sock, view[:read])
990 total_sent += read
991 return total_sent
992 finally:
993 if total_sent > 0 and hasattr(file, 'seek'):
994 file.seek(offset + total_sent)
996 def _check_sendfile_params(self, sock, file, offset, count):
997 if 'b' not in getattr(file, 'mode', 'b'):
998 raise ValueError("file should be opened in binary mode")
999 if not sock.type == socket.SOCK_STREAM:
1000 raise ValueError("only SOCK_STREAM type sockets are supported")
1001 if count is not None:
1002 if not isinstance(count, int):
1003 raise TypeError(
1004 "count must be a positive integer (got {!r})".format(count))
1005 if count <= 0:
1006 raise ValueError(
1007 "count must be a positive integer (got {!r})".format(count))
1008 if not isinstance(offset, int):
1009 raise TypeError(
1010 "offset must be a non-negative integer (got {!r})".format(
1011 offset))
1012 if offset < 0:
1013 raise ValueError(
1014 "offset must be a non-negative integer (got {!r})".format(
1015 offset))
1017 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
1018 """Create, bind and connect one socket."""
1019 my_exceptions = []
1020 exceptions.append(my_exceptions)
1021 family, type_, proto, _, address = addr_info
1022 sock = None
1023 try:
1024 try:
1025 sock = socket.socket(family=family, type=type_, proto=proto)
1026 sock.setblocking(False)
1027 if local_addr_infos is not None:
1028 for lfamily, _, _, _, laddr in local_addr_infos:
1029 # skip local addresses of different family
1030 if lfamily != family:
1031 continue
1032 try:
1033 sock.bind(laddr)
1034 break
1035 except OSError as exc:
1036 msg = (
1037 f'error while attempting to bind on '
1038 f'address {laddr!r}: {str(exc).lower()}'
1039 )
1040 exc = OSError(exc.errno, msg)
1041 my_exceptions.append(exc)
1042 else: # all bind attempts failed
1043 if my_exceptions:
1044 raise my_exceptions.pop()
1045 else:
1046 raise OSError(f"no matching local address with {family=} found")
1047 await self.sock_connect(sock, address)
1048 return sock
1049 except OSError as exc:
1050 my_exceptions.append(exc)
1051 raise
1052 except:
1053 if sock is not None:
1054 try:
1055 sock.close()
1056 except OSError:
1057 # An error when closing a newly created socket is
1058 # not important, but it can overwrite more important
1059 # non-OSError error. So ignore it.
1060 pass
1061 raise
1062 finally:
1063 exceptions = my_exceptions = None
1065 async def create_connection(
1066 self, protocol_factory, host=None, port=None,
1067 *, ssl=None, family=0,
1068 proto=0, flags=0, sock=None,
1069 local_addr=None, server_hostname=None,
1070 ssl_handshake_timeout=None,
1071 ssl_shutdown_timeout=None,
1072 happy_eyeballs_delay=None, interleave=None,
1073 all_errors=False):
1074 """Connect to a TCP server.
1076 Create a streaming transport connection to a given internet host and
1077 port: socket family AF_INET or socket.AF_INET6 depending on host (or
1078 family if specified), socket type SOCK_STREAM. protocol_factory must be
1079 a callable returning a protocol instance.
1081 This method is a coroutine which will try to establish the connection
1082 in the background. When successful, the coroutine returns a
1083 (transport, protocol) pair.
1084 """
1085 if server_hostname is not None and not ssl:
1086 raise ValueError('server_hostname is only meaningful with ssl')
1088 if server_hostname is None and ssl:
1089 # Use host as default for server_hostname. It is an error
1090 # if host is empty or not set, e.g. when an
1091 # already-connected socket was passed or when only a port
1092 # is given. To avoid this error, you can pass
1093 # server_hostname='' -- this will bypass the hostname
1094 # check. (This also means that if host is a numeric
1095 # IP/IPv6 address, we will attempt to verify that exact
1096 # address; this will probably fail, but it is possible to
1097 # create a certificate for a specific IP address, so we
1098 # don't judge it here.)
1099 if not host:
1100 raise ValueError('You must set server_hostname '
1101 'when using ssl without a host')
1102 server_hostname = host
1104 if ssl_handshake_timeout is not None and not ssl:
1105 raise ValueError(
1106 'ssl_handshake_timeout is only meaningful with ssl')
1108 if ssl_shutdown_timeout is not None and not ssl: 1108 ↛ 1109line 1108 didn't jump to line 1109 because the condition on line 1108 was never true
1109 raise ValueError(
1110 'ssl_shutdown_timeout is only meaningful with ssl')
1112 if sock is not None:
1113 _check_ssl_socket(sock)
1115 if happy_eyeballs_delay is not None and interleave is None:
1116 # If using happy eyeballs, default to interleave addresses by family
1117 interleave = 1
1119 if host is not None or port is not None:
1120 if sock is not None:
1121 raise ValueError(
1122 'host/port and sock can not be specified at the same time')
1124 infos = await self._ensure_resolved(
1125 (host, port), family=family,
1126 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
1127 if not infos:
1128 raise OSError('getaddrinfo() returned empty list')
1130 if local_addr is not None:
1131 laddr_infos = await self._ensure_resolved(
1132 local_addr, family=family,
1133 type=socket.SOCK_STREAM, proto=proto,
1134 flags=flags, loop=self)
1135 if not laddr_infos:
1136 raise OSError('getaddrinfo() returned empty list')
1137 else:
1138 laddr_infos = None
1140 if interleave:
1141 infos = _interleave_addrinfos(infos, interleave)
1143 exceptions = []
1144 if happy_eyeballs_delay is None:
1145 # not using happy eyeballs
1146 for addrinfo in infos:
1147 try:
1148 sock = await self._connect_sock(
1149 exceptions, addrinfo, laddr_infos)
1150 break
1151 except OSError:
1152 continue
1153 else: # using happy eyeballs
1154 sock = (await staggered.staggered_race(
1155 (
1156 # can't use functools.partial as it keeps a reference
1157 # to exceptions
1158 lambda addrinfo=addrinfo: self._connect_sock(
1159 exceptions, addrinfo, laddr_infos
1160 )
1161 for addrinfo in infos
1162 ),
1163 happy_eyeballs_delay,
1164 loop=self,
1165 ))[0] # can't use sock, _, _ as it keeks a reference to exceptions
1167 if sock is None:
1168 exceptions = [exc for sub in exceptions for exc in sub]
1169 try:
1170 if all_errors:
1171 raise ExceptionGroup("create_connection failed", exceptions)
1172 if len(exceptions) == 1:
1173 raise exceptions[0]
1174 elif exceptions:
1175 # If they all have the same str(), raise one.
1176 model = str(exceptions[0])
1177 if all(str(exc) == model for exc in exceptions):
1178 raise exceptions[0]
1179 # Raise a combined exception so the user can see all
1180 # the various error messages.
1181 raise OSError('Multiple exceptions: {}'.format(
1182 ', '.join(str(exc) for exc in exceptions)))
1183 else:
1184 # No exceptions were collected, raise a timeout error
1185 raise TimeoutError('create_connection failed')
1186 finally:
1187 exceptions = None
1189 else:
1190 if sock is None:
1191 raise ValueError(
1192 'host and port was not specified and no sock specified')
1193 if sock.type != socket.SOCK_STREAM:
1194 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1195 # are SOCK_STREAM.
1196 # We support passing AF_UNIX sockets even though we have
1197 # a dedicated API for that: create_unix_connection.
1198 # Disallowing AF_UNIX in this method, breaks backwards
1199 # compatibility.
1200 raise ValueError(
1201 f'A Stream Socket was expected, got {sock!r}')
1203 transport, protocol = await self._create_connection_transport(
1204 sock, protocol_factory, ssl, server_hostname,
1205 ssl_handshake_timeout=ssl_handshake_timeout,
1206 ssl_shutdown_timeout=ssl_shutdown_timeout)
1207 if self._debug:
1208 # Get the socket from the transport because SSL transport closes
1209 # the old socket and creates a new SSL socket
1210 sock = transport.get_extra_info('socket')
1211 logger.debug("%r connected to %s:%r: (%r, %r)",
1212 sock, host, port, transport, protocol)
1213 return transport, protocol
1215 async def _create_connection_transport(
1216 self, sock, protocol_factory, ssl,
1217 server_hostname, server_side=False,
1218 ssl_handshake_timeout=None,
1219 ssl_shutdown_timeout=None, context=None):
1221 sock.setblocking(False)
1222 context = context if context is not None else contextvars.copy_context()
1224 protocol = protocol_factory()
1225 waiter = self.create_future()
1226 if ssl:
1227 sslcontext = None if isinstance(ssl, bool) else ssl
1228 transport = self._make_ssl_transport(
1229 sock, protocol, sslcontext, waiter,
1230 server_side=server_side, server_hostname=server_hostname,
1231 ssl_handshake_timeout=ssl_handshake_timeout,
1232 ssl_shutdown_timeout=ssl_shutdown_timeout,
1233 context=context)
1234 else:
1235 transport = self._make_socket_transport(sock, protocol, waiter, context=context)
1237 try:
1238 await waiter
1239 except:
1240 transport.close()
1241 raise
1243 return transport, protocol
1245 async def sendfile(self, transport, file, offset=0, count=None,
1246 *, fallback=True):
1247 """Send a file to transport.
1249 Return the total number of bytes which were sent.
1251 The method uses high-performance os.sendfile if available.
1253 file must be a regular file object opened in binary mode.
1255 offset tells from where to start reading the file. If specified,
1256 count is the total number of bytes to transmit as opposed to
1257 sending the file until EOF is reached. File position is updated on
1258 return or also in case of error in which case file.tell()
1259 can be used to figure out the number of bytes
1260 which were sent.
1262 fallback set to True makes asyncio to manually read and send
1263 the file when the platform does not support the sendfile syscall
1264 (e.g. Windows or SSL socket on Unix).
1266 Raise SendfileNotAvailableError if the system does not support
1267 sendfile syscall and fallback is False.
1268 """
1269 if transport.is_closing():
1270 raise RuntimeError("Transport is closing")
1271 mode = getattr(transport, '_sendfile_compatible',
1272 constants._SendfileMode.UNSUPPORTED)
1273 if mode is constants._SendfileMode.UNSUPPORTED:
1274 raise RuntimeError(
1275 f"sendfile is not supported for transport {transport!r}")
1276 if mode is constants._SendfileMode.TRY_NATIVE:
1277 try:
1278 return await self._sendfile_native(transport, file,
1279 offset, count)
1280 except exceptions.SendfileNotAvailableError:
1281 if not fallback:
1282 raise
1284 if not fallback:
1285 raise RuntimeError(
1286 f"fallback is disabled and native sendfile is not "
1287 f"supported for transport {transport!r}")
1289 return await self._sendfile_fallback(transport, file,
1290 offset, count)
1292 async def _sendfile_native(self, transp, file, offset, count):
1293 raise exceptions.SendfileNotAvailableError(
1294 "sendfile syscall is not supported")
1296 async def _sendfile_fallback(self, transp, file, offset, count):
1297 if offset:
1298 file.seek(offset)
1299 blocksize = min(count, 16384) if count else 16384
1300 buf = bytearray(blocksize)
1301 total_sent = 0
1302 proto = _SendfileFallbackProtocol(transp)
1303 try:
1304 while True:
1305 if count:
1306 blocksize = min(count - total_sent, blocksize)
1307 if blocksize <= 0:
1308 return total_sent
1309 view = memoryview(buf)[:blocksize]
1310 read = await self.run_in_executor(None, file.readinto, view)
1311 if not read:
1312 return total_sent # EOF
1313 transp.write(view[:read])
1314 await proto.drain()
1315 total_sent += read
1316 finally:
1317 if total_sent > 0 and hasattr(file, 'seek'):
1318 file.seek(offset + total_sent)
1319 await proto.restore()
1321 async def start_tls(self, transport, protocol, sslcontext, *,
1322 server_side=False,
1323 server_hostname=None,
1324 ssl_handshake_timeout=None,
1325 ssl_shutdown_timeout=None):
1326 """Upgrade transport to TLS.
1328 Return a new transport that *protocol* should start using
1329 immediately.
1330 """
1331 if ssl is None: 1331 ↛ 1332line 1331 didn't jump to line 1332 because the condition on line 1331 was never true
1332 raise RuntimeError('Python ssl module is not available')
1334 if not isinstance(sslcontext, ssl.SSLContext):
1335 raise TypeError(
1336 f'sslcontext is expected to be an instance of ssl.SSLContext, '
1337 f'got {sslcontext!r}')
1339 if not getattr(transport, '_start_tls_compatible', False):
1340 raise TypeError(
1341 f'transport {transport!r} is not supported by start_tls()')
1343 waiter = self.create_future()
1344 ssl_protocol = sslproto.SSLProtocol(
1345 self, protocol, sslcontext, waiter,
1346 server_side, server_hostname,
1347 ssl_handshake_timeout=ssl_handshake_timeout,
1348 ssl_shutdown_timeout=ssl_shutdown_timeout,
1349 call_connection_made=False)
1351 # Pause early so that "ssl_protocol.data_received()" doesn't
1352 # have a chance to get called before "ssl_protocol.connection_made()".
1353 transport.pause_reading()
1355 # gh-142352: move buffered StreamReader data to SSLProtocol
1356 if server_side:
1357 from .streams import StreamReaderProtocol
1358 if isinstance(protocol, StreamReaderProtocol):
1359 stream_reader = getattr(protocol, '_stream_reader', None)
1360 if stream_reader is not None: 1360 ↛ 1366line 1360 didn't jump to line 1366 because the condition on line 1360 was always true
1361 buffer = stream_reader._buffer
1362 if buffer:
1363 ssl_protocol._incoming.write(buffer)
1364 buffer.clear()
1366 transport.set_protocol(ssl_protocol)
1367 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1368 resume_cb = self.call_soon(transport.resume_reading)
1370 try:
1371 await waiter
1372 except BaseException:
1373 transport.close()
1374 conmade_cb.cancel()
1375 resume_cb.cancel()
1376 raise
1378 return ssl_protocol._app_transport
1380 async def create_datagram_endpoint(self, protocol_factory,
1381 local_addr=None, remote_addr=None, *,
1382 family=0, proto=0, flags=0,
1383 reuse_port=None,
1384 allow_broadcast=None, sock=None):
1385 """Create datagram connection."""
1386 if sock is not None:
1387 if sock.type == socket.SOCK_STREAM:
1388 raise ValueError(
1389 f'A datagram socket was expected, got {sock!r}')
1390 if (local_addr or remote_addr or
1391 family or proto or flags or
1392 reuse_port or allow_broadcast):
1393 # show the problematic kwargs in exception msg
1394 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1395 family=family, proto=proto, flags=flags,
1396 reuse_port=reuse_port,
1397 allow_broadcast=allow_broadcast)
1398 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
1399 raise ValueError(
1400 f'socket modifier keyword arguments can not be used '
1401 f'when sock is specified. ({problems})')
1402 sock.setblocking(False)
1403 r_addr = None
1404 else:
1405 if not (local_addr or remote_addr):
1406 if family == 0:
1407 raise ValueError('unexpected address family')
1408 addr_pairs_info = (((family, proto), (None, None)),)
1409 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1410 for addr in (local_addr, remote_addr):
1411 if addr is not None and not isinstance(addr, str): 1411 ↛ 1412line 1411 didn't jump to line 1412 because the condition on line 1411 was never true
1412 raise TypeError('string is expected')
1414 if local_addr and local_addr[0] not in (0, '\x00'): 1414 ↛ 1426line 1414 didn't jump to line 1426 because the condition on line 1414 was always true
1415 try:
1416 if stat.S_ISSOCK(os.stat(local_addr).st_mode): 1416 ↛ 1426line 1416 didn't jump to line 1426 because the condition on line 1416 was always true
1417 os.remove(local_addr)
1418 except FileNotFoundError:
1419 pass
1420 except OSError as err:
1421 # Directory may have permissions only to create socket.
1422 logger.error('Unable to check or remove stale UNIX '
1423 'socket %r: %r',
1424 local_addr, err)
1426 addr_pairs_info = (((family, proto),
1427 (local_addr, remote_addr)), )
1428 else:
1429 # join address by (family, protocol)
1430 addr_infos = {} # Using order preserving dict
1431 for idx, addr in ((0, local_addr), (1, remote_addr)):
1432 if addr is not None:
1433 if not (isinstance(addr, tuple) and len(addr) == 2):
1434 raise TypeError('2-tuple is expected')
1436 infos = await self._ensure_resolved(
1437 addr, family=family, type=socket.SOCK_DGRAM,
1438 proto=proto, flags=flags, loop=self)
1439 if not infos:
1440 raise OSError('getaddrinfo() returned empty list')
1442 for fam, _, pro, _, address in infos:
1443 key = (fam, pro)
1444 if key not in addr_infos: 1444 ↛ 1446line 1444 didn't jump to line 1446 because the condition on line 1444 was always true
1445 addr_infos[key] = [None, None]
1446 addr_infos[key][idx] = address
1448 # each addr has to have info for each (family, proto) pair
1449 addr_pairs_info = [
1450 (key, addr_pair) for key, addr_pair in addr_infos.items()
1451 if not ((local_addr and addr_pair[0] is None) or
1452 (remote_addr and addr_pair[1] is None))]
1454 if not addr_pairs_info:
1455 raise ValueError('can not get address information')
1457 exceptions = []
1459 for ((family, proto),
1460 (local_address, remote_address)) in addr_pairs_info:
1461 sock = None
1462 r_addr = None
1463 try:
1464 sock = socket.socket(
1465 family=family, type=socket.SOCK_DGRAM, proto=proto)
1466 if reuse_port:
1467 _set_reuseport(sock)
1468 if allow_broadcast:
1469 sock.setsockopt(
1470 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1471 sock.setblocking(False)
1473 if local_addr:
1474 sock.bind(local_address)
1475 if remote_addr:
1476 if not allow_broadcast:
1477 await self.sock_connect(sock, remote_address)
1478 r_addr = remote_address
1479 except OSError as exc:
1480 if sock is not None:
1481 sock.close()
1482 exceptions.append(exc)
1483 except:
1484 if sock is not None: 1484 ↛ 1486line 1484 didn't jump to line 1486 because the condition on line 1484 was always true
1485 sock.close()
1486 raise
1487 else:
1488 break
1489 else:
1490 raise exceptions[0]
1492 protocol = protocol_factory()
1493 waiter = self.create_future()
1494 transport = self._make_datagram_transport(
1495 sock, protocol, r_addr, waiter)
1496 if self._debug: 1496 ↛ 1497line 1496 didn't jump to line 1497 because the condition on line 1496 was never true
1497 if local_addr:
1498 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1499 "created: (%r, %r)",
1500 local_addr, remote_addr, transport, protocol)
1501 else:
1502 logger.debug("Datagram endpoint remote_addr=%r created: "
1503 "(%r, %r)",
1504 remote_addr, transport, protocol)
1506 try:
1507 await waiter
1508 except:
1509 transport.close()
1510 raise
1512 return transport, protocol
1514 async def _ensure_resolved(self, address, *,
1515 family=0, type=socket.SOCK_STREAM,
1516 proto=0, flags=0, loop):
1517 host, port = address[:2]
1518 info = _ipaddr_info(host, port, family, type, proto, *address[2:])
1519 if info is not None:
1520 # "host" is already a resolved IP.
1521 return [info]
1522 else:
1523 return await loop.getaddrinfo(host, port, family=family, type=type,
1524 proto=proto, flags=flags)
1526 async def _create_server_getaddrinfo(self, host, port, family, flags):
1527 infos = await self._ensure_resolved((host, port), family=family,
1528 type=socket.SOCK_STREAM,
1529 flags=flags, loop=self)
1530 if not infos:
1531 raise OSError(f'getaddrinfo({host!r}) returned empty list')
1532 return infos
1534 async def create_server(
1535 self, protocol_factory, host=None, port=None,
1536 *,
1537 family=socket.AF_UNSPEC,
1538 flags=socket.AI_PASSIVE,
1539 sock=None,
1540 backlog=100,
1541 ssl=None,
1542 reuse_address=None,
1543 reuse_port=None,
1544 keep_alive=None,
1545 ssl_handshake_timeout=None,
1546 ssl_shutdown_timeout=None,
1547 start_serving=True):
1548 """Create a TCP server.
1550 The host parameter can be a string, in that case the TCP server is
1551 bound to host and port.
1553 The host parameter can also be a sequence of strings and in that case
1554 the TCP server is bound to all hosts of the sequence. If a host
1555 appears multiple times (possibly indirectly e.g. when hostnames
1556 resolve to the same IP address), the server is only bound once to that
1557 host.
1559 Return a Server object which can be used to stop the service.
1561 This method is a coroutine.
1562 """
1563 if isinstance(ssl, bool): 1563 ↛ 1564line 1563 didn't jump to line 1564 because the condition on line 1563 was never true
1564 raise TypeError('ssl argument must be an SSLContext or None')
1566 if ssl_handshake_timeout is not None and ssl is None:
1567 raise ValueError(
1568 'ssl_handshake_timeout is only meaningful with ssl')
1570 if ssl_shutdown_timeout is not None and ssl is None: 1570 ↛ 1571line 1570 didn't jump to line 1571 because the condition on line 1570 was never true
1571 raise ValueError(
1572 'ssl_shutdown_timeout is only meaningful with ssl')
1574 if sock is not None:
1575 _check_ssl_socket(sock)
1577 if host is not None or port is not None:
1578 if sock is not None:
1579 raise ValueError(
1580 'host/port and sock can not be specified at the same time')
1582 if reuse_address is None: 1582 ↛ 1584line 1582 didn't jump to line 1584 because the condition on line 1582 was always true
1583 reuse_address = os.name == "posix" and sys.platform != "cygwin"
1584 sockets = []
1585 if host == '':
1586 hosts = [None]
1587 elif (isinstance(host, str) or
1588 not isinstance(host, collections.abc.Iterable)):
1589 hosts = [host]
1590 else:
1591 hosts = host
1593 fs = [self._create_server_getaddrinfo(host, port, family=family,
1594 flags=flags)
1595 for host in hosts]
1596 infos = await tasks.gather(*fs)
1597 infos = set(itertools.chain.from_iterable(infos))
1599 completed = False
1600 try:
1601 for res in infos:
1602 af, socktype, proto, canonname, sa = res
1603 try:
1604 sock = socket.socket(af, socktype, proto)
1605 except socket.error:
1606 # Assume it's a bad family/type/protocol combination.
1607 if self._debug:
1608 logger.warning('create_server() failed to create '
1609 'socket.socket(%r, %r, %r)',
1610 af, socktype, proto, exc_info=True)
1611 continue
1612 sockets.append(sock)
1613 if reuse_address: 1613 ↛ 1618line 1613 didn't jump to line 1618 because the condition on line 1613 was always true
1614 sock.setsockopt(
1615 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1616 # Since Linux 6.12.9, SO_REUSEPORT is not allowed
1617 # on other address families than AF_INET/AF_INET6.
1618 if reuse_port and af in (socket.AF_INET, socket.AF_INET6):
1619 _set_reuseport(sock)
1620 if keep_alive: 1620 ↛ 1621line 1620 didn't jump to line 1621 because the condition on line 1620 was never true
1621 sock.setsockopt(
1622 socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)
1623 # Disable IPv4/IPv6 dual stack support (enabled by
1624 # default on Linux) which makes a single socket
1625 # listen on both address families.
1626 if (_HAS_IPv6 and
1627 af == socket.AF_INET6 and
1628 hasattr(socket, 'IPPROTO_IPV6')):
1629 sock.setsockopt(socket.IPPROTO_IPV6,
1630 socket.IPV6_V6ONLY,
1631 True)
1632 try:
1633 sock.bind(sa)
1634 except OSError as err:
1635 msg = ('error while attempting '
1636 'to bind on address %r: %s'
1637 % (sa, str(err).lower()))
1638 if err.errno == errno.EADDRNOTAVAIL: 1638 ↛ 1640line 1638 didn't jump to line 1640 because the condition on line 1638 was never true
1639 # Assume the family is not enabled (bpo-30945)
1640 sockets.pop()
1641 sock.close()
1642 if self._debug:
1643 logger.warning(msg)
1644 continue
1645 raise OSError(err.errno, msg) from None
1647 if not sockets: 1647 ↛ 1648line 1647 didn't jump to line 1648 because the condition on line 1647 was never true
1648 raise OSError('could not bind on any address out of %r'
1649 % ([info[4] for info in infos],))
1651 completed = True
1652 finally:
1653 if not completed:
1654 for sock in sockets: 1654 ↛ 1663line 1654 didn't jump to line 1663 because the loop on line 1654 didn't complete
1655 sock.close()
1656 else:
1657 if sock is None:
1658 raise ValueError('Neither host/port nor sock were specified')
1659 if sock.type != socket.SOCK_STREAM:
1660 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1661 sockets = [sock]
1663 for sock in sockets:
1664 sock.setblocking(False)
1666 server = Server(self, sockets, protocol_factory,
1667 ssl, backlog, ssl_handshake_timeout,
1668 ssl_shutdown_timeout)
1669 if start_serving:
1670 server._start_serving()
1671 # Skip one loop iteration so that all 'loop.add_reader'
1672 # go through.
1673 await tasks.sleep(0)
1675 if self._debug:
1676 logger.info("%r is serving", server)
1677 return server
1679 async def connect_accepted_socket(
1680 self, protocol_factory, sock,
1681 *, ssl=None,
1682 ssl_handshake_timeout=None,
1683 ssl_shutdown_timeout=None):
1684 if sock.type != socket.SOCK_STREAM: 1684 ↛ 1685line 1684 didn't jump to line 1685 because the condition on line 1684 was never true
1685 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1687 if ssl_handshake_timeout is not None and not ssl:
1688 raise ValueError(
1689 'ssl_handshake_timeout is only meaningful with ssl')
1691 if ssl_shutdown_timeout is not None and not ssl: 1691 ↛ 1692line 1691 didn't jump to line 1692 because the condition on line 1691 was never true
1692 raise ValueError(
1693 'ssl_shutdown_timeout is only meaningful with ssl')
1695 _check_ssl_socket(sock)
1697 transport, protocol = await self._create_connection_transport(
1698 sock, protocol_factory, ssl, '', server_side=True,
1699 ssl_handshake_timeout=ssl_handshake_timeout,
1700 ssl_shutdown_timeout=ssl_shutdown_timeout)
1701 if self._debug: 1701 ↛ 1704line 1701 didn't jump to line 1704 because the condition on line 1701 was never true
1702 # Get the socket from the transport because SSL transport closes
1703 # the old socket and creates a new SSL socket
1704 sock = transport.get_extra_info('socket')
1705 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1706 return transport, protocol
1708 async def connect_read_pipe(self, protocol_factory, pipe):
1709 protocol = protocol_factory()
1710 waiter = self.create_future()
1711 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
1713 try:
1714 await waiter
1715 except:
1716 transport.close()
1717 raise
1719 if self._debug: 1719 ↛ 1720line 1719 didn't jump to line 1720 because the condition on line 1719 was never true
1720 logger.debug('Read pipe %r connected: (%r, %r)',
1721 pipe.fileno(), transport, protocol)
1722 return transport, protocol
1724 async def connect_write_pipe(self, protocol_factory, pipe):
1725 protocol = protocol_factory()
1726 waiter = self.create_future()
1727 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
1729 try:
1730 await waiter
1731 except:
1732 transport.close()
1733 raise
1735 if self._debug: 1735 ↛ 1736line 1735 didn't jump to line 1736 because the condition on line 1735 was never true
1736 logger.debug('Write pipe %r connected: (%r, %r)',
1737 pipe.fileno(), transport, protocol)
1738 return transport, protocol
1740 def _log_subprocess(self, msg, stdin, stdout, stderr):
1741 info = [msg]
1742 if stdin is not None:
1743 info.append(f'stdin={_format_pipe(stdin)}')
1744 if stdout is not None and stderr == subprocess.STDOUT:
1745 info.append(f'stdout=stderr={_format_pipe(stdout)}')
1746 else:
1747 if stdout is not None:
1748 info.append(f'stdout={_format_pipe(stdout)}')
1749 if stderr is not None:
1750 info.append(f'stderr={_format_pipe(stderr)}')
1751 logger.debug(' '.join(info))
1753 async def subprocess_shell(self, protocol_factory, cmd, *,
1754 stdin=subprocess.PIPE,
1755 stdout=subprocess.PIPE,
1756 stderr=subprocess.PIPE,
1757 universal_newlines=False,
1758 shell=True, bufsize=0,
1759 encoding=None, errors=None, text=None,
1760 **kwargs):
1761 if not isinstance(cmd, (bytes, str)):
1762 raise ValueError("cmd must be a string")
1763 if universal_newlines:
1764 raise ValueError("universal_newlines must be False")
1765 if not shell:
1766 raise ValueError("shell must be True")
1767 if bufsize != 0:
1768 raise ValueError("bufsize must be 0")
1769 if text:
1770 raise ValueError("text must be False")
1771 if encoding is not None:
1772 raise ValueError("encoding must be None")
1773 if errors is not None:
1774 raise ValueError("errors must be None")
1776 protocol = protocol_factory()
1777 debug_log = None
1778 if self._debug: 1778 ↛ 1781line 1778 didn't jump to line 1781 because the condition on line 1778 was never true
1779 # don't log parameters: they may contain sensitive information
1780 # (password) and may be too long
1781 debug_log = 'run shell command %r' % cmd
1782 self._log_subprocess(debug_log, stdin, stdout, stderr)
1783 transport = await self._make_subprocess_transport(
1784 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
1785 if self._debug and debug_log is not None: 1785 ↛ 1786line 1785 didn't jump to line 1786 because the condition on line 1785 was never true
1786 logger.info('%s: %r', debug_log, transport)
1787 return transport, protocol
1789 async def subprocess_exec(self, protocol_factory, program, *args,
1790 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1791 stderr=subprocess.PIPE, universal_newlines=False,
1792 shell=False, bufsize=0,
1793 encoding=None, errors=None, text=None,
1794 **kwargs):
1795 if universal_newlines:
1796 raise ValueError("universal_newlines must be False")
1797 if shell:
1798 raise ValueError("shell must be False")
1799 if bufsize != 0:
1800 raise ValueError("bufsize must be 0")
1801 if text:
1802 raise ValueError("text must be False")
1803 if encoding is not None:
1804 raise ValueError("encoding must be None")
1805 if errors is not None:
1806 raise ValueError("errors must be None")
1808 popen_args = (program,) + args
1809 protocol = protocol_factory()
1810 debug_log = None
1811 if self._debug: 1811 ↛ 1814line 1811 didn't jump to line 1814 because the condition on line 1811 was never true
1812 # don't log parameters: they may contain sensitive information
1813 # (password) and may be too long
1814 debug_log = f'execute program {program!r}'
1815 self._log_subprocess(debug_log, stdin, stdout, stderr)
1816 transport = await self._make_subprocess_transport(
1817 protocol, popen_args, False, stdin, stdout, stderr,
1818 bufsize, **kwargs)
1819 if self._debug and debug_log is not None: 1819 ↛ 1820line 1819 didn't jump to line 1820 because the condition on line 1819 was never true
1820 logger.info('%s: %r', debug_log, transport)
1821 return transport, protocol
1823 def get_exception_handler(self):
1824 """Return an exception handler, or None if the default one is in use.
1825 """
1826 return self._exception_handler
1828 def set_exception_handler(self, handler):
1829 """Set handler as the new event loop exception handler.
1831 If handler is None, the default exception handler will
1832 be set.
1834 If handler is a callable object, it should have a
1835 signature matching '(loop, context)', where 'loop'
1836 will be a reference to the active event loop, 'context'
1837 will be a dict object (see `call_exception_handler()`
1838 documentation for details about context).
1839 """
1840 if handler is not None and not callable(handler):
1841 raise TypeError(f'A callable object or None is expected, '
1842 f'got {handler!r}')
1843 self._exception_handler = handler
1845 def default_exception_handler(self, context):
1846 """Default exception handler.
1848 This is called when an exception occurs and no exception
1849 handler is set, and can be called by a custom exception
1850 handler that wants to defer to the default behavior.
1852 This default handler logs the error message and other
1853 context-dependent information. In debug mode, a truncated
1854 stack trace is also appended showing where the given object
1855 (e.g. a handle or future or task) was created, if any.
1857 The context parameter has the same meaning as in
1858 `call_exception_handler()`.
1859 """
1860 message = context.get('message')
1861 if not message: 1861 ↛ 1862line 1861 didn't jump to line 1862 because the condition on line 1861 was never true
1862 message = 'Unhandled exception in event loop'
1864 exception = context.get('exception')
1865 if exception is not None:
1866 exc_info = (type(exception), exception, exception.__traceback__)
1867 else:
1868 exc_info = False
1870 if ('source_traceback' not in context and 1870 ↛ 1873line 1870 didn't jump to line 1873 because the condition on line 1870 was never true
1871 self._current_handle is not None and
1872 self._current_handle._source_traceback):
1873 context['handle_traceback'] = \
1874 self._current_handle._source_traceback
1876 log_lines = [message]
1877 for key in sorted(context):
1878 if key in {'message', 'exception'}:
1879 continue
1880 value = context[key]
1881 if key == 'source_traceback':
1882 tb = ''.join(traceback.format_list(value))
1883 value = 'Object created at (most recent call last):\n'
1884 value += tb.rstrip()
1885 elif key == 'handle_traceback': 1885 ↛ 1886line 1885 didn't jump to line 1886 because the condition on line 1885 was never true
1886 tb = ''.join(traceback.format_list(value))
1887 value = 'Handle created at (most recent call last):\n'
1888 value += tb.rstrip()
1889 else:
1890 value = repr(value)
1891 log_lines.append(f'{key}: {value}')
1893 logger.error('\n'.join(log_lines), exc_info=exc_info)
1895 def call_exception_handler(self, context):
1896 """Call the current event loop's exception handler.
1898 The context argument is a dict containing the following keys:
1900 - 'message': Error message;
1901 - 'exception' (optional): Exception object;
1902 - 'future' (optional): Future instance;
1903 - 'task' (optional): Task instance;
1904 - 'handle' (optional): Handle instance;
1905 - 'protocol' (optional): Protocol instance;
1906 - 'transport' (optional): Transport instance;
1907 - 'socket' (optional): Socket instance;
1908 - 'source_traceback' (optional): Traceback of the source;
1909 - 'handle_traceback' (optional): Traceback of the handle;
1910 - 'asyncgen' (optional): Asynchronous generator that caused
1911 the exception.
1913 New keys maybe introduced in the future.
1915 Note: do not overload this method in an event loop subclass.
1916 For custom exception handling, use the
1917 `set_exception_handler()` method.
1918 """
1919 if self._exception_handler is None:
1920 try:
1921 self.default_exception_handler(context)
1922 except (SystemExit, KeyboardInterrupt):
1923 raise
1924 except BaseException:
1925 # Second protection layer for unexpected errors
1926 # in the default implementation, as well as for subclassed
1927 # event loops with overloaded "default_exception_handler".
1928 logger.error('Exception in default exception handler',
1929 exc_info=True)
1930 else:
1931 try:
1932 ctx = None
1933 thing = context.get("task")
1934 if thing is None:
1935 # Even though Futures don't have a context,
1936 # Task is a subclass of Future,
1937 # and sometimes the 'future' key holds a Task.
1938 thing = context.get("future")
1939 if thing is None:
1940 # Handles also have a context.
1941 thing = context.get("handle")
1942 if thing is not None and hasattr(thing, "get_context"):
1943 ctx = thing.get_context()
1944 if ctx is not None and hasattr(ctx, "run"):
1945 ctx.run(self._exception_handler, self, context)
1946 else:
1947 self._exception_handler(self, context)
1948 except (SystemExit, KeyboardInterrupt):
1949 raise
1950 except BaseException as exc:
1951 # Exception in the user set custom exception handler.
1952 try:
1953 # Let's try default handler.
1954 self.default_exception_handler({
1955 'message': 'Unhandled error in exception handler',
1956 'exception': exc,
1957 'context': context,
1958 })
1959 except (SystemExit, KeyboardInterrupt):
1960 raise
1961 except BaseException:
1962 # Guard 'default_exception_handler' in case it is
1963 # overloaded.
1964 logger.error('Exception in default exception handler '
1965 'while handling an unexpected error '
1966 'in custom exception handler',
1967 exc_info=True)
1969 def _add_callback(self, handle):
1970 """Add a Handle to _ready."""
1971 if not handle._cancelled:
1972 self._ready.append(handle)
1974 def _add_callback_signalsafe(self, handle):
1975 """Like _add_callback() but called from a signal handler."""
1976 self._add_callback(handle)
1977 self._write_to_self()
1979 def _timer_handle_cancelled(self, handle):
1980 """Notification that a TimerHandle has been cancelled."""
1981 if handle._scheduled:
1982 self._timer_cancelled_count += 1
1984 def _run_once(self):
1985 """Run one full iteration of the event loop.
1987 This calls all currently ready callbacks, polls for I/O,
1988 schedules the resulting callbacks, and finally schedules
1989 'call_later' callbacks.
1990 """
1992 sched_count = len(self._scheduled)
1993 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1994 self._timer_cancelled_count / sched_count >
1995 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1996 # Remove delayed calls that were cancelled if their number
1997 # is too high
1998 new_scheduled = []
1999 for handle in self._scheduled:
2000 if handle._cancelled:
2001 handle._scheduled = False
2002 else:
2003 new_scheduled.append(handle)
2005 heapq.heapify(new_scheduled)
2006 self._scheduled = new_scheduled
2007 self._timer_cancelled_count = 0
2008 else:
2009 # Remove delayed calls that were cancelled from head of queue.
2010 while self._scheduled and self._scheduled[0]._cancelled:
2011 self._timer_cancelled_count -= 1
2012 handle = heapq.heappop(self._scheduled)
2013 handle._scheduled = False
2015 timeout = None
2016 if self._ready or self._stopping:
2017 timeout = 0
2018 elif self._scheduled:
2019 # Compute the desired timeout.
2020 timeout = self._scheduled[0]._when - self.time()
2021 if timeout > MAXIMUM_SELECT_TIMEOUT: 2021 ↛ 2022line 2021 didn't jump to line 2022 because the condition on line 2021 was never true
2022 timeout = MAXIMUM_SELECT_TIMEOUT
2023 elif timeout < 0:
2024 timeout = 0
2026 event_list = self._selector.select(timeout)
2027 self._process_events(event_list)
2028 # Needed to break cycles when an exception occurs.
2029 event_list = None
2031 # Handle 'later' callbacks that are ready.
2032 now = self.time()
2033 # Ensure that `end_time` is strictly increasing
2034 # when the clock resolution is too small.
2035 end_time = now + max(self._clock_resolution, math.ulp(now))
2036 while self._scheduled:
2037 handle = self._scheduled[0]
2038 if handle._when >= end_time:
2039 break
2040 handle = heapq.heappop(self._scheduled)
2041 handle._scheduled = False
2042 self._ready.append(handle)
2044 # This is the only place where callbacks are actually *called*.
2045 # All other places just add them to ready.
2046 # Note: We run all currently scheduled callbacks, but not any
2047 # callbacks scheduled by callbacks run this time around --
2048 # they will be run the next time (after another I/O poll).
2049 # Use an idiom that is thread-safe without using locks.
2050 ntodo = len(self._ready)
2051 for i in range(ntodo):
2052 handle = self._ready.popleft()
2053 if handle._cancelled:
2054 continue
2055 if self._debug:
2056 try:
2057 self._current_handle = handle
2058 t0 = self.time()
2059 handle._run()
2060 dt = self.time() - t0
2061 if dt >= self.slow_callback_duration:
2062 logger.warning('Executing %s took %.3f seconds',
2063 _format_handle(handle), dt)
2064 finally:
2065 self._current_handle = None
2066 else:
2067 handle._run()
2068 handle = None # Needed to break cycles when an exception occurs.
2070 def _set_coroutine_origin_tracking(self, enabled):
2071 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
2072 return
2074 if enabled:
2075 self._coroutine_origin_tracking_saved_depth = (
2076 sys.get_coroutine_origin_tracking_depth())
2077 sys.set_coroutine_origin_tracking_depth(
2078 constants.DEBUG_STACK_DEPTH)
2079 else:
2080 sys.set_coroutine_origin_tracking_depth(
2081 self._coroutine_origin_tracking_saved_depth)
2083 self._coroutine_origin_tracking_enabled = enabled
2085 def get_debug(self):
2086 return self._debug
2088 def set_debug(self, enabled):
2089 self._debug = enabled
2091 if self.is_running():
2092 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)