Coverage for Lib/asyncio/base_events.py: 88%
1168 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:28 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:28 +0000
1"""Base implementation of event loop.
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of I/O events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
16import collections
17import contextvars
18import collections.abc
19import concurrent.futures
20import errno
21import heapq
22import itertools
23import math
24import os
25import socket
26import stat
27import subprocess
28import sys
29import threading
30import time
31import traceback
32import warnings
33import weakref
34import inspect
36try:
37 import ssl
38except ImportError: # pragma: no cover
39 ssl = None
41from . import constants
42from . import coroutines
43from . import events
44from . import exceptions
45from . import futures
46from . import protocols
47from . import sslproto
48from . import staggered
49from . import tasks
50from . import timeouts
51from . import transports
52from . import trsock
53from .log import logger
56__all__ = 'BaseEventLoop','Server',
59# Minimum number of _scheduled timer handles before cleanup of
60# cancelled handles is performed.
61_MIN_SCHEDULED_TIMER_HANDLES = 100
63# Minimum fraction of _scheduled timer handles that are cancelled
64# before cleanup of cancelled handles is performed.
65_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
68_HAS_IPv6 = hasattr(socket, 'AF_INET6')
70# Maximum timeout passed to select to avoid OS limitations
71MAXIMUM_SELECT_TIMEOUT = 24 * 3600
74def _format_handle(handle):
75 cb = handle._callback
76 if isinstance(getattr(cb, '__self__', None), tasks.Task):
77 # format the task
78 return repr(cb.__self__)
79 else:
80 return str(handle)
83def _format_pipe(fd):
84 if fd == subprocess.PIPE:
85 return '<pipe>'
86 elif fd == subprocess.STDOUT:
87 return '<stdout>'
88 else:
89 return repr(fd)
92def _set_reuseport(sock):
93 if not hasattr(socket, 'SO_REUSEPORT'):
94 raise ValueError('reuse_port not supported by socket module')
95 else:
96 try:
97 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
98 except OSError:
99 raise ValueError('reuse_port not supported by socket module, '
100 'SO_REUSEPORT defined but not implemented.')
103def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
104 # Try to skip getaddrinfo if "host" is already an IP. Users might have
105 # handled name resolution in their own code and pass in resolved IPs.
106 if not hasattr(socket, 'inet_pton'):
107 return
109 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
110 host is None:
111 return None
113 if type == socket.SOCK_STREAM:
114 proto = socket.IPPROTO_TCP
115 elif type == socket.SOCK_DGRAM:
116 proto = socket.IPPROTO_UDP
117 else:
118 return None
120 if port is None:
121 port = 0
122 elif isinstance(port, bytes) and port == b'':
123 port = 0
124 elif isinstance(port, str) and port == '':
125 port = 0
126 else:
127 # If port's a service name like "http", don't skip getaddrinfo.
128 try:
129 port = int(port)
130 except (TypeError, ValueError):
131 return None
133 if family == socket.AF_UNSPEC:
134 afs = [socket.AF_INET]
135 if _HAS_IPv6: 135 ↛ 140line 135 didn't jump to line 140 because the condition on line 135 was always true
136 afs.append(socket.AF_INET6)
137 else:
138 afs = [family]
140 if isinstance(host, bytes):
141 host = host.decode('idna')
142 if '%' in host:
143 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
144 # like '::1%lo0'.
145 return None
147 for af in afs:
148 try:
149 socket.inet_pton(af, host)
150 # The host has already been resolved.
151 if _HAS_IPv6 and af == socket.AF_INET6:
152 return af, type, proto, '', (host, port, flowinfo, scopeid)
153 else:
154 return af, type, proto, '', (host, port)
155 except OSError:
156 pass
158 # "host" is not an IP address.
159 return None
162def _interleave_addrinfos(addrinfos, first_address_family_count=1):
163 """Interleave list of addrinfo tuples by family."""
164 # Group addresses by family
165 addrinfos_by_family = collections.OrderedDict()
166 for addr in addrinfos:
167 family = addr[0]
168 if family not in addrinfos_by_family:
169 addrinfos_by_family[family] = []
170 addrinfos_by_family[family].append(addr)
171 addrinfos_lists = list(addrinfos_by_family.values())
173 reordered = []
174 if first_address_family_count > 1:
175 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
176 del addrinfos_lists[0][:first_address_family_count - 1]
177 reordered.extend(
178 a for a in itertools.chain.from_iterable(
179 itertools.zip_longest(*addrinfos_lists)
180 ) if a is not None)
181 return reordered
184def _run_until_complete_cb(fut):
185 if not fut.cancelled():
186 exc = fut.exception()
187 if isinstance(exc, (SystemExit, KeyboardInterrupt)):
188 # Issue #22429: run_forever() already finished, no need to
189 # stop it.
190 return
191 futures._get_loop(fut).stop()
194if hasattr(socket, 'TCP_NODELAY'): 194 ↛ 201line 194 didn't jump to line 201 because the condition on line 194 was always true
195 def _set_nodelay(sock):
196 if (sock.family in {socket.AF_INET, socket.AF_INET6} and
197 sock.type == socket.SOCK_STREAM and
198 sock.proto == socket.IPPROTO_TCP):
199 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
200else:
201 def _set_nodelay(sock):
202 pass
205def _check_ssl_socket(sock):
206 if ssl is not None and isinstance(sock, ssl.SSLSocket): 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true
207 raise TypeError("Socket cannot be of type SSLSocket")
210class _SendfileFallbackProtocol(protocols.Protocol):
211 def __init__(self, transp):
212 if not isinstance(transp, transports._FlowControlMixin): 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true
213 raise TypeError("transport should be _FlowControlMixin instance")
214 self._transport = transp
215 self._proto = transp.get_protocol()
216 self._should_resume_reading = transp.is_reading()
217 self._should_resume_writing = transp._protocol_paused
218 transp.pause_reading()
219 transp.set_protocol(self)
220 if self._should_resume_writing: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true
221 self._write_ready_fut = self._transport._loop.create_future()
222 else:
223 self._write_ready_fut = None
225 async def drain(self):
226 if self._transport.is_closing(): 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true
227 raise ConnectionError("Connection closed by peer")
228 fut = self._write_ready_fut
229 if fut is None:
230 return
231 await fut
233 def connection_made(self, transport):
234 raise RuntimeError("Invalid state: "
235 "connection should have been established already.")
237 def connection_lost(self, exc):
238 if self._write_ready_fut is not None: 238 ↛ 246line 238 didn't jump to line 246 because the condition on line 238 was always true
239 # Never happens if peer disconnects after sending the whole content
240 # Thus disconnection is always an exception from user perspective
241 if exc is None: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true
242 self._write_ready_fut.set_exception(
243 ConnectionError("Connection is closed by peer"))
244 else:
245 self._write_ready_fut.set_exception(exc)
246 self._proto.connection_lost(exc)
248 def pause_writing(self):
249 if self._write_ready_fut is not None: 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true
250 return
251 self._write_ready_fut = self._transport._loop.create_future()
253 def resume_writing(self):
254 if self._write_ready_fut is None: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true
255 return
256 self._write_ready_fut.set_result(False)
257 self._write_ready_fut = None
259 def data_received(self, data):
260 raise RuntimeError("Invalid state: reading should be paused")
262 def eof_received(self):
263 raise RuntimeError("Invalid state: reading should be paused")
265 async def restore(self):
266 self._transport.set_protocol(self._proto)
267 if self._should_resume_reading: 267 ↛ 269line 267 didn't jump to line 269 because the condition on line 267 was always true
268 self._transport.resume_reading()
269 if self._write_ready_fut is not None:
270 # Cancel the future.
271 # Basically it has no effect because protocol is switched back,
272 # no code should wait for it anymore.
273 self._write_ready_fut.cancel()
274 if self._should_resume_writing: 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true
275 self._proto.resume_writing()
278class Server(events.AbstractServer):
280 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
281 ssl_handshake_timeout, ssl_shutdown_timeout=None):
282 self._loop = loop
283 self._sockets = sockets
284 # Weak references so we don't break Transport's ability to
285 # detect abandoned transports
286 self._clients = weakref.WeakSet()
287 self._waiters = []
288 self._protocol_factory = protocol_factory
289 self._backlog = backlog
290 self._ssl_context = ssl_context
291 self._ssl_handshake_timeout = ssl_handshake_timeout
292 self._ssl_shutdown_timeout = ssl_shutdown_timeout
293 self._serving = False
294 self._serving_forever_fut = None
295 self._context = contextvars.copy_context()
297 def __repr__(self):
298 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
300 def _attach(self, transport):
301 assert self._sockets is not None
302 self._clients.add(transport)
304 def _detach(self, transport):
305 self._clients.discard(transport)
306 if len(self._clients) == 0 and self._sockets is None:
307 self._wakeup()
309 def _wakeup(self):
310 waiters = self._waiters
311 self._waiters = None
312 for waiter in waiters:
313 if not waiter.done(): 313 ↛ 312line 313 didn't jump to line 312 because the condition on line 313 was always true
314 waiter.set_result(None)
316 def _start_serving(self):
317 if self._serving:
318 return
319 self._serving = True
320 for sock in self._sockets:
321 sock.listen(self._backlog)
322 self._loop._start_serving(
323 self._protocol_factory, sock, self._ssl_context,
324 self, self._backlog, self._ssl_handshake_timeout,
325 self._ssl_shutdown_timeout, context=self._context)
327 def get_loop(self):
328 return self._loop
330 def is_serving(self):
331 return self._serving
333 @property
334 def sockets(self):
335 if self._sockets is None:
336 return ()
337 return tuple(trsock.TransportSocket(s) for s in self._sockets)
339 def close(self):
340 sockets = self._sockets
341 if sockets is None:
342 return
343 self._sockets = None
345 for sock in sockets:
346 self._loop._stop_serving(sock)
348 self._serving = False
350 if (self._serving_forever_fut is not None and 350 ↛ 352line 350 didn't jump to line 352 because the condition on line 350 was never true
351 not self._serving_forever_fut.done()):
352 self._serving_forever_fut.cancel()
353 self._serving_forever_fut = None
355 if len(self._clients) == 0:
356 self._wakeup()
358 def close_clients(self):
359 for transport in self._clients.copy():
360 transport.close()
362 def abort_clients(self):
363 for transport in self._clients.copy():
364 transport.abort()
366 async def start_serving(self):
367 self._start_serving()
368 # Skip one loop iteration so that all 'loop.add_reader'
369 # go through.
370 await tasks.sleep(0)
372 async def serve_forever(self):
373 if self._serving_forever_fut is not None: 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true
374 raise RuntimeError(
375 f'server {self!r} is already being awaited on serve_forever()')
376 if self._sockets is None:
377 raise RuntimeError(f'server {self!r} is closed')
379 self._start_serving()
380 self._serving_forever_fut = self._loop.create_future()
382 try:
383 await self._serving_forever_fut
384 except exceptions.CancelledError:
385 try:
386 self.close()
387 self.close_clients()
388 await self.wait_closed()
389 finally:
390 raise
391 finally:
392 self._serving_forever_fut = None
394 async def wait_closed(self):
395 """Wait until server is closed and all connections are dropped.
397 - If the server is not closed, wait.
398 - If it is closed, but there are still active connections, wait.
400 Anyone waiting here will be unblocked once both conditions
401 (server is closed and all connections have been dropped)
402 have become true, in either order.
404 Historical note: In 3.11 and before, this was broken, returning
405 immediately if the server was already closed, even if there
406 were still active connections. An attempted fix in 3.12.0 was
407 still broken, returning immediately if the server was still
408 open and there were no active connections. Hopefully in 3.12.1
409 we have it right.
410 """
411 # Waiters are unblocked by self._wakeup(), which is called
412 # from two places: self.close() and self._detach(), but only
413 # when both conditions have become true. To signal that this
414 # has happened, self._wakeup() sets self._waiters to None.
415 if self._waiters is None:
416 return
417 waiter = self._loop.create_future()
418 self._waiters.append(waiter)
419 await waiter
422class BaseEventLoop(events.AbstractEventLoop):
424 def __init__(self):
425 self._timer_cancelled_count = 0
426 self._closed = False
427 self._stopping = False
428 self._ready = collections.deque()
429 self._scheduled = []
430 self._default_executor = None
431 self._internal_fds = 0
432 # Identifier of the thread running the event loop, or None if the
433 # event loop is not running
434 self._thread_id = None
435 self._clock_resolution = time.get_clock_info('monotonic').resolution
436 self._exception_handler = None
437 self.set_debug(coroutines._is_debug_mode())
438 # The preserved state of async generator hooks.
439 self._old_agen_hooks = None
440 # In debug mode, if the execution of a callback or a step of a task
441 # exceed this duration in seconds, the slow callback/task is logged.
442 self.slow_callback_duration = 0.1
443 self._current_handle = None
444 self._task_factory = None
445 self._coroutine_origin_tracking_enabled = False
446 self._coroutine_origin_tracking_saved_depth = None
448 # A weak set of all asynchronous generators that are
449 # being iterated by the loop.
450 self._asyncgens = weakref.WeakSet()
451 # Set to True when `loop.shutdown_asyncgens` is called.
452 self._asyncgens_shutdown_called = False
453 # Set to True when `loop.shutdown_default_executor` is called.
454 self._executor_shutdown_called = False
456 def __repr__(self):
457 return (
458 f'<{self.__class__.__name__} running={self.is_running()} '
459 f'closed={self.is_closed()} debug={self.get_debug()}>'
460 )
462 def create_future(self):
463 """Create a Future object attached to the loop."""
464 return futures.Future(loop=self)
466 def create_task(self, coro, **kwargs):
467 """Schedule or begin executing a coroutine object.
469 Return a task object.
470 """
471 self._check_closed()
472 if self._task_factory is not None:
473 return self._task_factory(self, coro, **kwargs)
475 task = tasks.Task(coro, loop=self, **kwargs)
476 if task._source_traceback:
477 del task._source_traceback[-1]
478 try:
479 return task
480 finally:
481 # gh-128552: prevent a refcycle of
482 # task.exception().__traceback__->BaseEventLoop.create_task->task
483 del task
485 def set_task_factory(self, factory):
486 """Set a task factory that will be used by loop.create_task().
488 If factory is None the default task factory will be set.
490 If factory is a callable, it should have a signature matching
491 '(loop, coro, **kwargs)', where 'loop' will be a reference to the
492 active event loop, 'coro' will be a coroutine object, and **kwargs
493 will be arbitrary keyword arguments that should be passed on to
494 Task. The callable must return a Task.
495 """
496 if factory is not None and not callable(factory):
497 raise TypeError('task factory must be a callable or None')
498 self._task_factory = factory
500 def get_task_factory(self):
501 """Return a task factory, or None if the default one is in use."""
502 return self._task_factory
504 def _make_socket_transport(self, sock, protocol, waiter=None, *,
505 extra=None, server=None):
506 """Create socket transport."""
507 raise NotImplementedError
509 def _make_ssl_transport(
510 self, rawsock, protocol, sslcontext, waiter=None,
511 *, server_side=False, server_hostname=None,
512 extra=None, server=None,
513 ssl_handshake_timeout=None,
514 ssl_shutdown_timeout=None,
515 call_connection_made=True,
516 context=None):
517 """Create SSL transport."""
518 raise NotImplementedError
520 def _make_datagram_transport(self, sock, protocol,
521 address=None, waiter=None, extra=None):
522 """Create datagram transport."""
523 raise NotImplementedError
525 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
526 extra=None):
527 """Create read pipe transport."""
528 raise NotImplementedError
530 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
531 extra=None):
532 """Create write pipe transport."""
533 raise NotImplementedError
535 async def _make_subprocess_transport(self, protocol, args, shell,
536 stdin, stdout, stderr, bufsize,
537 extra=None, **kwargs):
538 """Create subprocess transport."""
539 raise NotImplementedError
541 def _write_to_self(self):
542 """Write a byte to self-pipe, to wake up the event loop.
544 This may be called from a different thread.
546 The subclass is responsible for implementing the self-pipe.
547 """
548 raise NotImplementedError
550 def _process_events(self, event_list):
551 """Process selector events."""
552 raise NotImplementedError
554 def _check_closed(self):
555 if self._closed:
556 raise RuntimeError('Event loop is closed')
558 def _check_default_executor(self):
559 if self._executor_shutdown_called: 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true
560 raise RuntimeError('Executor shutdown has been called')
562 def _asyncgen_finalizer_hook(self, agen):
563 self._asyncgens.discard(agen)
564 if not self.is_closed(): 564 ↛ exitline 564 didn't return from function '_asyncgen_finalizer_hook' because the condition on line 564 was always true
565 self.call_soon_threadsafe(self.create_task, agen.aclose())
567 def _asyncgen_firstiter_hook(self, agen):
568 if self._asyncgens_shutdown_called: 568 ↛ 569line 568 didn't jump to line 569 because the condition on line 568 was never true
569 warnings.warn(
570 f"asynchronous generator {agen!r} was scheduled after "
571 f"loop.shutdown_asyncgens() call",
572 ResourceWarning, source=self)
574 self._asyncgens.add(agen)
576 async def shutdown_asyncgens(self):
577 """Shutdown all active asynchronous generators."""
578 self._asyncgens_shutdown_called = True
580 if not len(self._asyncgens): 580 ↛ 585line 580 didn't jump to line 585 because the condition on line 580 was always true
581 # If Python version is <3.6 or we don't have any asynchronous
582 # generators alive.
583 return
585 closing_agens = list(self._asyncgens)
586 self._asyncgens.clear()
588 results = await tasks.gather(
589 *[ag.aclose() for ag in closing_agens],
590 return_exceptions=True)
592 for result, agen in zip(results, closing_agens):
593 if isinstance(result, Exception):
594 self.call_exception_handler({
595 'message': f'an error occurred during closing of '
596 f'asynchronous generator {agen!r}',
597 'exception': result,
598 'asyncgen': agen
599 })
601 async def shutdown_default_executor(self, timeout=None):
602 """Schedule the shutdown of the default executor.
604 The timeout parameter specifies the amount of time the executor will
605 be given to finish joining. The default value is None, which means
606 that the executor will be given an unlimited amount of time.
607 """
608 self._executor_shutdown_called = True
609 if self._default_executor is None:
610 return
611 future = self.create_future()
612 thread = threading.Thread(target=self._do_shutdown, args=(future,))
613 thread.start()
614 try:
615 async with timeouts.timeout(timeout):
616 await future
617 except TimeoutError:
618 warnings.warn("The executor did not finishing joining "
619 f"its threads within {timeout} seconds.",
620 RuntimeWarning, stacklevel=2)
621 self._default_executor.shutdown(wait=False)
622 else:
623 thread.join()
625 def _do_shutdown(self, future):
626 try:
627 self._default_executor.shutdown(wait=True)
628 if not self.is_closed(): 628 ↛ exitline 628 didn't return from function '_do_shutdown' because the condition on line 628 was always true
629 self.call_soon_threadsafe(futures._set_result_unless_cancelled,
630 future, None)
631 except Exception as ex:
632 if not self.is_closed() and not future.cancelled():
633 self.call_soon_threadsafe(future.set_exception, ex)
635 def _check_running(self):
636 if self.is_running():
637 raise RuntimeError('This event loop is already running')
638 if events._get_running_loop() is not None:
639 raise RuntimeError(
640 'Cannot run the event loop while another loop is running')
642 def _run_forever_setup(self):
643 """Prepare the run loop to process events.
645 This method exists so that custom event loop subclasses (e.g., event loops
646 that integrate a GUI event loop with Python's event loop) have access to all the
647 loop setup logic.
648 """
649 self._check_closed()
650 self._check_running()
651 self._set_coroutine_origin_tracking(self._debug)
653 self._old_agen_hooks = sys.get_asyncgen_hooks()
654 self._thread_id = threading.get_ident()
655 sys.set_asyncgen_hooks(
656 firstiter=self._asyncgen_firstiter_hook,
657 finalizer=self._asyncgen_finalizer_hook
658 )
660 events._set_running_loop(self)
662 def _run_forever_cleanup(self):
663 """Clean up after an event loop finishes the looping over events.
665 This method exists so that custom event loop subclasses (e.g., event loops
666 that integrate a GUI event loop with Python's event loop) have access to all the
667 loop cleanup logic.
668 """
669 self._stopping = False
670 self._thread_id = None
671 events._set_running_loop(None)
672 self._set_coroutine_origin_tracking(False)
673 # Restore any pre-existing async generator hooks.
674 if self._old_agen_hooks is not None: 674 ↛ exitline 674 didn't return from function '_run_forever_cleanup' because the condition on line 674 was always true
675 sys.set_asyncgen_hooks(*self._old_agen_hooks)
676 self._old_agen_hooks = None
678 def run_forever(self):
679 """Run until stop() is called."""
680 self._run_forever_setup()
681 try:
682 while True:
683 self._run_once()
684 if self._stopping:
685 break
686 finally:
687 self._run_forever_cleanup()
689 def run_until_complete(self, future):
690 """Run until the Future is done.
692 If the argument is a coroutine, it is wrapped in a Task.
694 WARNING: It would be disastrous to call run_until_complete()
695 with the same coroutine twice -- it would wrap it in two
696 different Tasks and that can't be good.
698 Return the Future's result, or raise its exception.
699 """
700 self._check_closed()
701 self._check_running()
703 new_task = not futures.isfuture(future)
704 future = tasks.ensure_future(future, loop=self)
705 if new_task:
706 # An exception is raised if the future didn't complete, so there
707 # is no need to log the "destroy pending task" message
708 future._log_destroy_pending = False
710 future.add_done_callback(_run_until_complete_cb)
711 try:
712 self.run_forever()
713 except:
714 if new_task and future.done() and not future.cancelled():
715 # The coroutine raised a BaseException. Consume the exception
716 # to not log a warning, the caller doesn't have access to the
717 # local task.
718 future.exception()
719 raise
720 finally:
721 future.remove_done_callback(_run_until_complete_cb)
722 if not future.done():
723 raise RuntimeError('Event loop stopped before Future completed.')
725 return future.result()
727 def stop(self):
728 """Stop running the event loop.
730 Every callback already scheduled will still run. This simply
731 informs run_forever to stop looping after a complete iteration.
732 """
733 self._stopping = True
735 def close(self):
736 """Close the event loop.
738 This clears the queues and shuts down the executor,
739 but does not wait for the executor to finish.
741 The event loop must not be running.
742 """
743 if self.is_running(): 743 ↛ 744line 743 didn't jump to line 744 because the condition on line 743 was never true
744 raise RuntimeError("Cannot close a running event loop")
745 if self._closed:
746 return
747 if self._debug:
748 logger.debug("Close %r", self)
749 self._closed = True
750 self._ready.clear()
751 self._scheduled.clear()
752 self._executor_shutdown_called = True
753 executor = self._default_executor
754 if executor is not None:
755 self._default_executor = None
756 executor.shutdown(wait=False)
758 def is_closed(self):
759 """Returns True if the event loop was closed."""
760 return self._closed
762 def __del__(self, _warn=warnings.warn):
763 if not self.is_closed(): 763 ↛ 764line 763 didn't jump to line 764 because the condition on line 763 was never true
764 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
765 if not self.is_running():
766 self.close()
768 def is_running(self):
769 """Returns True if the event loop is running."""
770 return (self._thread_id is not None)
772 def time(self):
773 """Return the time according to the event loop's clock.
775 This is a float expressed in seconds since an epoch, but the
776 epoch, precision, accuracy and drift are unspecified and may
777 differ per event loop.
778 """
779 return time.monotonic()
781 def call_later(self, delay, callback, *args, context=None):
782 """Arrange for a callback to be called at a given time.
784 Return a Handle: an opaque object with a cancel() method that
785 can be used to cancel the call.
787 The delay can be an int or float, expressed in seconds. It is
788 always relative to the current time.
790 Each callback will be called exactly once. If two callbacks
791 are scheduled for exactly the same time, it is undefined which
792 will be called first.
794 Any positional arguments after the callback will be passed to
795 the callback when it is called.
796 """
797 if delay is None:
798 raise TypeError('delay must not be None')
799 timer = self.call_at(self.time() + delay, callback, *args,
800 context=context)
801 if timer._source_traceback:
802 del timer._source_traceback[-1]
803 return timer
805 def call_at(self, when, callback, *args, context=None):
806 """Like call_later(), but uses an absolute time.
808 Absolute time corresponds to the event loop's time() method.
809 """
810 if when is None:
811 raise TypeError("when cannot be None")
812 self._check_closed()
813 if self._debug:
814 self._check_thread()
815 self._check_callback(callback, 'call_at')
816 timer = events.TimerHandle(when, callback, args, self, context)
817 if timer._source_traceback:
818 del timer._source_traceback[-1]
819 heapq.heappush(self._scheduled, timer)
820 timer._scheduled = True
821 return timer
823 def call_soon(self, callback, *args, context=None):
824 """Arrange for a callback to be called as soon as possible.
826 This operates as a FIFO queue: callbacks are called in the
827 order in which they are registered. Each callback will be
828 called exactly once.
830 Any positional arguments after the callback will be passed to
831 the callback when it is called.
832 """
833 self._check_closed()
834 if self._debug:
835 self._check_thread()
836 self._check_callback(callback, 'call_soon')
837 handle = self._call_soon(callback, args, context)
838 if handle._source_traceback:
839 del handle._source_traceback[-1]
840 return handle
842 def _check_callback(self, callback, method):
843 if (coroutines.iscoroutine(callback) or
844 inspect.iscoroutinefunction(callback)):
845 raise TypeError(
846 f"coroutines cannot be used with {method}()")
847 if not callable(callback):
848 raise TypeError(
849 f'a callable object was expected by {method}(), '
850 f'got {callback!r}')
852 def _call_soon(self, callback, args, context):
853 handle = events.Handle(callback, args, self, context)
854 if handle._source_traceback:
855 del handle._source_traceback[-1]
856 self._ready.append(handle)
857 return handle
859 def _check_thread(self):
860 """Check that the current thread is the thread running the event loop.
862 Non-thread-safe methods of this class make this assumption and will
863 likely behave incorrectly when the assumption is violated.
865 Should only be called when (self._debug == True). The caller is
866 responsible for checking this condition for performance reasons.
867 """
868 if self._thread_id is None:
869 return
870 thread_id = threading.get_ident()
871 if thread_id != self._thread_id:
872 raise RuntimeError(
873 "Non-thread-safe operation invoked on an event loop other "
874 "than the current one")
876 def call_soon_threadsafe(self, callback, *args, context=None):
877 """Like call_soon(), but thread-safe."""
878 self._check_closed()
879 if self._debug:
880 self._check_callback(callback, 'call_soon_threadsafe')
881 handle = events._ThreadSafeHandle(callback, args, self, context)
882 self._ready.append(handle)
883 if handle._source_traceback:
884 del handle._source_traceback[-1]
885 if handle._source_traceback:
886 del handle._source_traceback[-1]
887 self._write_to_self()
888 return handle
890 def run_in_executor(self, executor, func, *args):
891 self._check_closed()
892 if self._debug:
893 self._check_callback(func, 'run_in_executor')
894 if executor is None:
895 executor = self._default_executor
896 # Only check when the default executor is being used
897 self._check_default_executor()
898 if executor is None:
899 executor = concurrent.futures.ThreadPoolExecutor(
900 thread_name_prefix='asyncio'
901 )
902 self._default_executor = executor
903 return futures.wrap_future(
904 executor.submit(func, *args), loop=self)
906 def set_default_executor(self, executor):
907 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
908 raise TypeError('executor must be ThreadPoolExecutor instance')
909 self._default_executor = executor
911 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
912 msg = [f"{host}:{port!r}"]
913 if family:
914 msg.append(f'family={family!r}')
915 if type:
916 msg.append(f'type={type!r}')
917 if proto:
918 msg.append(f'proto={proto!r}')
919 if flags:
920 msg.append(f'flags={flags!r}')
921 msg = ', '.join(msg)
922 logger.debug('Get address info %s', msg)
924 t0 = self.time()
925 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
926 dt = self.time() - t0
928 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
929 if dt >= self.slow_callback_duration:
930 logger.info(msg)
931 else:
932 logger.debug(msg)
933 return addrinfo
935 async def getaddrinfo(self, host, port, *,
936 family=0, type=0, proto=0, flags=0):
937 if self._debug: 937 ↛ 938line 937 didn't jump to line 938 because the condition on line 937 was never true
938 getaddr_func = self._getaddrinfo_debug
939 else:
940 getaddr_func = socket.getaddrinfo
942 return await self.run_in_executor(
943 None, getaddr_func, host, port, family, type, proto, flags)
945 async def getnameinfo(self, sockaddr, flags=0):
946 return await self.run_in_executor(
947 None, socket.getnameinfo, sockaddr, flags)
949 async def sock_sendfile(self, sock, file, offset=0, count=None,
950 *, fallback=True):
951 if self._debug and sock.gettimeout() != 0:
952 raise ValueError("the socket must be non-blocking")
953 _check_ssl_socket(sock)
954 self._check_sendfile_params(sock, file, offset, count)
955 try:
956 return await self._sock_sendfile_native(sock, file,
957 offset, count)
958 except exceptions.SendfileNotAvailableError:
959 if not fallback:
960 raise
961 return await self._sock_sendfile_fallback(sock, file,
962 offset, count)
964 async def _sock_sendfile_native(self, sock, file, offset, count):
965 # NB: sendfile syscall is not supported for SSL sockets and
966 # non-mmap files even if sendfile is supported by OS
967 raise exceptions.SendfileNotAvailableError(
968 f"syscall sendfile is not available for socket {sock!r} "
969 f"and file {file!r} combination")
971 async def _sock_sendfile_fallback(self, sock, file, offset, count):
972 if hasattr(file, 'seek'): 972 ↛ 974line 972 didn't jump to line 974 because the condition on line 972 was always true
973 file.seek(offset)
974 blocksize = (
975 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
976 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
977 )
978 buf = bytearray(blocksize)
979 total_sent = 0
980 try:
981 while True:
982 if count:
983 blocksize = min(count - total_sent, blocksize)
984 if blocksize <= 0:
985 break
986 view = memoryview(buf)[:blocksize]
987 read = await self.run_in_executor(None, file.readinto, view)
988 if not read:
989 break # EOF
990 await self.sock_sendall(sock, view[:read])
991 total_sent += read
992 return total_sent
993 finally:
994 if total_sent > 0 and hasattr(file, 'seek'):
995 file.seek(offset + total_sent)
997 def _check_sendfile_params(self, sock, file, offset, count):
998 if 'b' not in getattr(file, 'mode', 'b'):
999 raise ValueError("file should be opened in binary mode")
1000 if not sock.type == socket.SOCK_STREAM:
1001 raise ValueError("only SOCK_STREAM type sockets are supported")
1002 if count is not None:
1003 if not isinstance(count, int):
1004 raise TypeError(
1005 "count must be a positive integer (got {!r})".format(count))
1006 if count <= 0:
1007 raise ValueError(
1008 "count must be a positive integer (got {!r})".format(count))
1009 if not isinstance(offset, int):
1010 raise TypeError(
1011 "offset must be a non-negative integer (got {!r})".format(
1012 offset))
1013 if offset < 0:
1014 raise ValueError(
1015 "offset must be a non-negative integer (got {!r})".format(
1016 offset))
1018 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
1019 """Create, bind and connect one socket."""
1020 my_exceptions = []
1021 exceptions.append(my_exceptions)
1022 family, type_, proto, _, address = addr_info
1023 sock = None
1024 try:
1025 try:
1026 sock = socket.socket(family=family, type=type_, proto=proto)
1027 sock.setblocking(False)
1028 if local_addr_infos is not None:
1029 for lfamily, _, _, _, laddr in local_addr_infos:
1030 # skip local addresses of different family
1031 if lfamily != family:
1032 continue
1033 try:
1034 sock.bind(laddr)
1035 break
1036 except OSError as exc:
1037 msg = (
1038 f'error while attempting to bind on '
1039 f'address {laddr!r}: {str(exc).lower()}'
1040 )
1041 exc = OSError(exc.errno, msg)
1042 my_exceptions.append(exc)
1043 else: # all bind attempts failed
1044 if my_exceptions:
1045 raise my_exceptions.pop()
1046 else:
1047 raise OSError(f"no matching local address with {family=} found")
1048 await self.sock_connect(sock, address)
1049 return sock
1050 except OSError as exc:
1051 my_exceptions.append(exc)
1052 raise
1053 except:
1054 if sock is not None:
1055 try:
1056 sock.close()
1057 except OSError:
1058 # An error when closing a newly created socket is
1059 # not important, but it can overwrite more important
1060 # non-OSError error. So ignore it.
1061 pass
1062 raise
1063 finally:
1064 exceptions = my_exceptions = None
1066 async def create_connection(
1067 self, protocol_factory, host=None, port=None,
1068 *, ssl=None, family=0,
1069 proto=0, flags=0, sock=None,
1070 local_addr=None, server_hostname=None,
1071 ssl_handshake_timeout=None,
1072 ssl_shutdown_timeout=None,
1073 happy_eyeballs_delay=None, interleave=None,
1074 all_errors=False):
1075 """Connect to a TCP server.
1077 Create a streaming transport connection to a given internet host and
1078 port: socket family AF_INET or socket.AF_INET6 depending on host (or
1079 family if specified), socket type SOCK_STREAM. protocol_factory must
1080 be a callable returning a protocol instance.
1082 This method is a coroutine which will try to establish the
1083 connection in the background. When successful, the coroutine
1084 returns a (transport, protocol) pair.
1085 """
1086 if server_hostname is not None and not ssl:
1087 raise ValueError('server_hostname is only meaningful with ssl')
1089 if server_hostname is None and ssl:
1090 # Use host as default for server_hostname. It is an error
1091 # if host is empty or not set, e.g. when an
1092 # already-connected socket was passed or when only a port
1093 # is given. To avoid this error, you can pass
1094 # server_hostname='' -- this will bypass the hostname
1095 # check. (This also means that if host is a numeric
1096 # IP/IPv6 address, we will attempt to verify that exact
1097 # address; this will probably fail, but it is possible to
1098 # create a certificate for a specific IP address, so we
1099 # don't judge it here.)
1100 if not host:
1101 raise ValueError('You must set server_hostname '
1102 'when using ssl without a host')
1103 server_hostname = host
1105 if ssl_handshake_timeout is not None and not ssl:
1106 raise ValueError(
1107 'ssl_handshake_timeout is only meaningful with ssl')
1109 if ssl_shutdown_timeout is not None and not ssl: 1109 ↛ 1110line 1109 didn't jump to line 1110 because the condition on line 1109 was never true
1110 raise ValueError(
1111 'ssl_shutdown_timeout is only meaningful with ssl')
1113 if sock is not None:
1114 _check_ssl_socket(sock)
1116 if happy_eyeballs_delay is not None and interleave is None:
1117 # If using happy eyeballs, default to interleave addresses by family
1118 interleave = 1
1120 if host is not None or port is not None:
1121 if sock is not None:
1122 raise ValueError(
1123 'host/port and sock can not be specified at the same time')
1125 infos = await self._ensure_resolved(
1126 (host, port), family=family,
1127 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
1128 if not infos:
1129 raise OSError('getaddrinfo() returned empty list')
1131 if local_addr is not None:
1132 laddr_infos = await self._ensure_resolved(
1133 local_addr, family=family,
1134 type=socket.SOCK_STREAM, proto=proto,
1135 flags=flags, loop=self)
1136 if not laddr_infos:
1137 raise OSError('getaddrinfo() returned empty list')
1138 else:
1139 laddr_infos = None
1141 if interleave:
1142 infos = _interleave_addrinfos(infos, interleave)
1144 exceptions = []
1145 if happy_eyeballs_delay is None:
1146 # not using happy eyeballs
1147 for addrinfo in infos:
1148 try:
1149 sock = await self._connect_sock(
1150 exceptions, addrinfo, laddr_infos)
1151 break
1152 except OSError:
1153 continue
1154 else: # using happy eyeballs
1155 sock = (await staggered.staggered_race(
1156 (
1157 # can't use functools.partial as it keeps a reference
1158 # to exceptions
1159 lambda addrinfo=addrinfo: self._connect_sock(
1160 exceptions, addrinfo, laddr_infos
1161 )
1162 for addrinfo in infos
1163 ),
1164 happy_eyeballs_delay,
1165 loop=self,
1166 ))[0] # can't use sock, _, _ as it keeks a reference to exceptions
1168 if sock is None:
1169 exceptions = [exc for sub in exceptions for exc in sub]
1170 try:
1171 if all_errors:
1172 raise ExceptionGroup("create_connection failed", exceptions)
1173 if len(exceptions) == 1:
1174 raise exceptions[0]
1175 elif exceptions:
1176 # If they all have the same str(), raise one.
1177 model = str(exceptions[0])
1178 if all(str(exc) == model for exc in exceptions):
1179 raise exceptions[0]
1180 # Raise a combined exception so the user can see all
1181 # the various error messages.
1182 raise OSError('Multiple exceptions: {}'.format(
1183 ', '.join(str(exc) for exc in exceptions)))
1184 else:
1185 # No exceptions were collected, raise a timeout error
1186 raise TimeoutError('create_connection failed')
1187 finally:
1188 exceptions = None
1190 else:
1191 if sock is None:
1192 raise ValueError(
1193 'host and port was not specified and no sock specified')
1194 if sock.type != socket.SOCK_STREAM:
1195 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1196 # are SOCK_STREAM.
1197 # We support passing AF_UNIX sockets even though we have
1198 # a dedicated API for that: create_unix_connection.
1199 # Disallowing AF_UNIX in this method, breaks backwards
1200 # compatibility.
1201 raise ValueError(
1202 f'A Stream Socket was expected, got {sock!r}')
1204 transport, protocol = await self._create_connection_transport(
1205 sock, protocol_factory, ssl, server_hostname,
1206 ssl_handshake_timeout=ssl_handshake_timeout,
1207 ssl_shutdown_timeout=ssl_shutdown_timeout)
1208 if self._debug:
1209 # Get the socket from the transport because SSL transport closes
1210 # the old socket and creates a new SSL socket
1211 sock = transport.get_extra_info('socket')
1212 logger.debug("%r connected to %s:%r: (%r, %r)",
1213 sock, host, port, transport, protocol)
1214 return transport, protocol
1216 async def _create_connection_transport(
1217 self, sock, protocol_factory, ssl,
1218 server_hostname, server_side=False,
1219 ssl_handshake_timeout=None,
1220 ssl_shutdown_timeout=None, context=None):
1222 sock.setblocking(False)
1223 context = context if context is not None else contextvars.copy_context()
1225 protocol = protocol_factory()
1226 waiter = self.create_future()
1227 if ssl:
1228 sslcontext = None if isinstance(ssl, bool) else ssl
1229 transport = self._make_ssl_transport(
1230 sock, protocol, sslcontext, waiter,
1231 server_side=server_side, server_hostname=server_hostname,
1232 ssl_handshake_timeout=ssl_handshake_timeout,
1233 ssl_shutdown_timeout=ssl_shutdown_timeout,
1234 context=context)
1235 else:
1236 transport = self._make_socket_transport(sock, protocol, waiter, context=context)
1238 try:
1239 await waiter
1240 except:
1241 transport.close()
1242 raise
1244 return transport, protocol
1246 async def sendfile(self, transport, file, offset=0, count=None,
1247 *, fallback=True):
1248 """Send a file to transport.
1250 Return the total number of bytes which were sent.
1252 The method uses high-performance os.sendfile if available.
1254 file must be a regular file object opened in binary mode.
1256 offset tells from where to start reading the file. If specified,
1257 count is the total number of bytes to transmit as opposed to
1258 sending the file until EOF is reached. File position is updated on
1259 return or also in case of error in which case file.tell()
1260 can be used to figure out the number of bytes
1261 which were sent.
1263 fallback set to True makes asyncio to manually read and send
1264 the file when the platform does not support the sendfile syscall
1265 (e.g. Windows or SSL socket on Unix).
1267 Raise SendfileNotAvailableError if the system does not support
1268 sendfile syscall and fallback is False.
1269 """
1270 if transport.is_closing():
1271 raise RuntimeError("Transport is closing")
1272 mode = getattr(transport, '_sendfile_compatible',
1273 constants._SendfileMode.UNSUPPORTED)
1274 if mode is constants._SendfileMode.UNSUPPORTED:
1275 raise RuntimeError(
1276 f"sendfile is not supported for transport {transport!r}")
1277 if mode is constants._SendfileMode.TRY_NATIVE:
1278 try:
1279 return await self._sendfile_native(transport, file,
1280 offset, count)
1281 except exceptions.SendfileNotAvailableError:
1282 if not fallback:
1283 raise
1285 if not fallback:
1286 raise RuntimeError(
1287 f"fallback is disabled and native sendfile is not "
1288 f"supported for transport {transport!r}")
1289 return await self._sendfile_fallback(transport, file,
1290 offset, count)
1292 async def _sendfile_native(self, transp, file, offset, count):
1293 raise exceptions.SendfileNotAvailableError(
1294 "sendfile syscall is not supported")
1296 async def _sendfile_fallback(self, transp, file, offset, count):
1297 if hasattr(file, 'seek'): 1297 ↛ 1299line 1297 didn't jump to line 1299 because the condition on line 1297 was always true
1298 file.seek(offset)
1299 blocksize = min(count, 16384) if count else 16384
1300 buf = bytearray(blocksize)
1301 total_sent = 0
1302 proto = _SendfileFallbackProtocol(transp)
1303 try:
1304 while True:
1305 if count:
1306 blocksize = min(count - total_sent, blocksize)
1307 if blocksize <= 0:
1308 return total_sent
1309 view = memoryview(buf)[:blocksize]
1310 read = await self.run_in_executor(None, file.readinto, view)
1311 if not read:
1312 return total_sent # EOF
1313 transp.write(view[:read])
1314 await proto.drain()
1315 total_sent += read
1316 finally:
1317 if total_sent > 0 and hasattr(file, 'seek'):
1318 file.seek(offset + total_sent)
1319 await proto.restore()
1321 async def start_tls(self, transport, protocol, sslcontext, *,
1322 server_side=False,
1323 server_hostname=None,
1324 ssl_handshake_timeout=None,
1325 ssl_shutdown_timeout=None):
1326 """Upgrade transport to TLS.
1328 Return a new transport that *protocol* should start using
1329 immediately.
1330 """
1331 if ssl is None: 1331 ↛ 1332line 1331 didn't jump to line 1332 because the condition on line 1331 was never true
1332 raise RuntimeError('Python ssl module is not available')
1334 if not isinstance(sslcontext, ssl.SSLContext):
1335 raise TypeError(
1336 f'sslcontext is expected to be an instance of ssl.SSLContext, '
1337 f'got {sslcontext!r}')
1339 if not getattr(transport, '_start_tls_compatible', False):
1340 raise TypeError(
1341 f'transport {transport!r} is not supported by start_tls()')
1343 waiter = self.create_future()
1344 ssl_protocol = sslproto.SSLProtocol(
1345 self, protocol, sslcontext, waiter,
1346 server_side, server_hostname,
1347 ssl_handshake_timeout=ssl_handshake_timeout,
1348 ssl_shutdown_timeout=ssl_shutdown_timeout,
1349 call_connection_made=False)
1351 # Pause early so that "ssl_protocol.data_received()" doesn't
1352 # have a chance to get called before "ssl_protocol.connection_made()".
1353 transport.pause_reading()
1355 # gh-142352: move buffered StreamReader data to SSLProtocol
1356 if server_side:
1357 from .streams import StreamReaderProtocol
1358 if isinstance(protocol, StreamReaderProtocol):
1359 stream_reader = getattr(protocol, '_stream_reader', None)
1360 if stream_reader is not None: 1360 ↛ 1366line 1360 didn't jump to line 1366 because the condition on line 1360 was always true
1361 buffer = stream_reader._buffer
1362 if buffer:
1363 ssl_protocol._incoming.write(buffer)
1364 buffer.clear()
1366 transport.set_protocol(ssl_protocol)
1367 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1368 resume_cb = self.call_soon(transport.resume_reading)
1370 try:
1371 await waiter
1372 except BaseException:
1373 transport.close()
1374 conmade_cb.cancel()
1375 resume_cb.cancel()
1376 raise
1378 return ssl_protocol._app_transport
1380 async def create_datagram_endpoint(self, protocol_factory,
1381 local_addr=None, remote_addr=None, *,
1382 family=0, proto=0, flags=0,
1383 reuse_port=None,
1384 allow_broadcast=None, sock=None):
1385 """Create datagram connection."""
1386 if sock is not None:
1387 if sock.type == socket.SOCK_STREAM:
1388 raise ValueError(
1389 f'A datagram socket was expected, got {sock!r}')
1390 if (local_addr or remote_addr or
1391 family or proto or flags or
1392 reuse_port or allow_broadcast):
1393 # show the problematic kwargs in exception msg
1394 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1395 family=family, proto=proto, flags=flags,
1396 reuse_port=reuse_port,
1397 allow_broadcast=allow_broadcast)
1398 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
1399 raise ValueError(
1400 f'socket modifier keyword arguments can not be used '
1401 f'when sock is specified. ({problems})')
1402 sock.setblocking(False)
1403 r_addr = None
1404 else:
1405 if not (local_addr or remote_addr):
1406 if family == 0:
1407 raise ValueError('unexpected address family')
1408 addr_pairs_info = (((family, proto), (None, None)),)
1409 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1410 for addr in (local_addr, remote_addr):
1411 if addr is not None and not isinstance(addr, str): 1411 ↛ 1412line 1411 didn't jump to line 1412 because the condition on line 1411 was never true
1412 raise TypeError('string is expected')
1414 if local_addr and local_addr[0] not in (0, '\x00'): 1414 ↛ 1426line 1414 didn't jump to line 1426 because the condition on line 1414 was always true
1415 try:
1416 if stat.S_ISSOCK(os.stat(local_addr).st_mode): 1416 ↛ 1426line 1416 didn't jump to line 1426 because the condition on line 1416 was always true
1417 os.remove(local_addr)
1418 except FileNotFoundError:
1419 pass
1420 except OSError as err:
1421 # Directory may have permissions only to create socket.
1422 logger.error('Unable to check or remove stale UNIX '
1423 'socket %r: %r',
1424 local_addr, err)
1426 addr_pairs_info = (((family, proto),
1427 (local_addr, remote_addr)), )
1428 else:
1429 # join address by (family, protocol)
1430 addr_infos = {} # Using order preserving dict
1431 for idx, addr in ((0, local_addr), (1, remote_addr)):
1432 if addr is not None:
1433 if not (isinstance(addr, tuple) and len(addr) == 2):
1434 raise TypeError('2-tuple is expected')
1436 infos = await self._ensure_resolved(
1437 addr, family=family, type=socket.SOCK_DGRAM,
1438 proto=proto, flags=flags, loop=self)
1439 if not infos:
1440 raise OSError('getaddrinfo() returned empty list')
1442 for fam, _, pro, _, address in infos:
1443 key = (fam, pro)
1444 if key not in addr_infos: 1444 ↛ 1446line 1444 didn't jump to line 1446 because the condition on line 1444 was always true
1445 addr_infos[key] = [None, None]
1446 addr_infos[key][idx] = address
1448 # each addr has to have info for each (family, proto) pair
1449 addr_pairs_info = [
1450 (key, addr_pair) for key, addr_pair in addr_infos.items()
1451 if not ((local_addr and addr_pair[0] is None) or
1452 (remote_addr and addr_pair[1] is None))]
1454 if not addr_pairs_info:
1455 raise ValueError('can not get address information')
1457 exceptions = []
1459 for ((family, proto),
1460 (local_address, remote_address)) in addr_pairs_info:
1461 sock = None
1462 r_addr = None
1463 try:
1464 sock = socket.socket(
1465 family=family, type=socket.SOCK_DGRAM, proto=proto)
1466 if reuse_port:
1467 _set_reuseport(sock)
1468 if allow_broadcast:
1469 sock.setsockopt(
1470 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1471 sock.setblocking(False)
1473 if local_addr:
1474 sock.bind(local_address)
1475 if remote_addr:
1476 if not allow_broadcast:
1477 await self.sock_connect(sock, remote_address)
1478 r_addr = remote_address
1479 except OSError as exc:
1480 if sock is not None:
1481 sock.close()
1482 exceptions.append(exc)
1483 except:
1484 if sock is not None: 1484 ↛ 1486line 1484 didn't jump to line 1486 because the condition on line 1484 was always true
1485 sock.close()
1486 raise
1487 else:
1488 break
1489 else:
1490 raise exceptions[0]
1492 protocol = protocol_factory()
1493 waiter = self.create_future()
1494 transport = self._make_datagram_transport(
1495 sock, protocol, r_addr, waiter)
1496 if self._debug: 1496 ↛ 1497line 1496 didn't jump to line 1497 because the condition on line 1496 was never true
1497 if local_addr:
1498 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1499 "created: (%r, %r)",
1500 local_addr, remote_addr, transport, protocol)
1501 else:
1502 logger.debug("Datagram endpoint remote_addr=%r created: "
1503 "(%r, %r)",
1504 remote_addr, transport, protocol)
1506 try:
1507 await waiter
1508 except:
1509 transport.close()
1510 raise
1512 return transport, protocol
1514 async def _ensure_resolved(self, address, *,
1515 family=0, type=socket.SOCK_STREAM,
1516 proto=0, flags=0, loop):
1517 host, port = address[:2]
1518 info = _ipaddr_info(host, port, family, type, proto, *address[2:])
1519 if info is not None:
1520 # "host" is already a resolved IP.
1521 return [info]
1522 else:
1523 return await loop.getaddrinfo(host, port, family=family, type=type,
1524 proto=proto, flags=flags)
1526 async def _create_server_getaddrinfo(self, host, port, family, flags):
1527 infos = await self._ensure_resolved((host, port), family=family,
1528 type=socket.SOCK_STREAM,
1529 flags=flags, loop=self)
1530 if not infos:
1531 raise OSError(f'getaddrinfo({host!r}) returned empty list')
1532 return infos
1534 async def create_server(
1535 self, protocol_factory, host=None, port=None,
1536 *,
1537 family=socket.AF_UNSPEC,
1538 flags=socket.AI_PASSIVE,
1539 sock=None,
1540 backlog=100,
1541 ssl=None,
1542 reuse_address=None,
1543 reuse_port=None,
1544 keep_alive=None,
1545 ssl_handshake_timeout=None,
1546 ssl_shutdown_timeout=None,
1547 start_serving=True):
1548 """Create a TCP server.
1550 The host parameter can be a string, in that case the TCP server is
1551 bound to host and port.
1553 The host parameter can also be a sequence of strings and in that
1554 case the TCP server is bound to all hosts of the sequence. If
1555 a host appears multiple times (possibly indirectly e.g. when
1556 hostnames resolve to the same IP address), the server is only bound
1557 once to that host.
1559 Return a Server object which can be used to stop the service.
1561 This method is a coroutine.
1562 """
1563 if isinstance(ssl, bool): 1563 ↛ 1564line 1563 didn't jump to line 1564 because the condition on line 1563 was never true
1564 raise TypeError('ssl argument must be an SSLContext or None')
1566 if ssl_handshake_timeout is not None and ssl is None:
1567 raise ValueError(
1568 'ssl_handshake_timeout is only meaningful with ssl')
1570 if ssl_shutdown_timeout is not None and ssl is None: 1570 ↛ 1571line 1570 didn't jump to line 1571 because the condition on line 1570 was never true
1571 raise ValueError(
1572 'ssl_shutdown_timeout is only meaningful with ssl')
1574 if sock is not None:
1575 _check_ssl_socket(sock)
1577 if host is not None or port is not None:
1578 if sock is not None:
1579 raise ValueError(
1580 'host/port and sock can not be specified at the same time')
1582 if reuse_address is None: 1582 ↛ 1584line 1582 didn't jump to line 1584 because the condition on line 1582 was always true
1583 reuse_address = os.name == "posix" and sys.platform != "cygwin"
1584 sockets = []
1585 if host == '':
1586 hosts = [None]
1587 elif (isinstance(host, str) or
1588 not isinstance(host, collections.abc.Iterable)):
1589 hosts = [host]
1590 else:
1591 hosts = host
1593 fs = [self._create_server_getaddrinfo(host, port, family=family,
1594 flags=flags)
1595 for host in hosts]
1596 infos = await tasks.gather(*fs)
1597 infos = set(itertools.chain.from_iterable(infos))
1599 completed = False
1600 try:
1601 for res in infos:
1602 af, socktype, proto, canonname, sa = res
1603 try:
1604 sock = socket.socket(af, socktype, proto)
1605 except socket.error:
1606 # Assume it's a bad family/type/protocol combination.
1607 if self._debug:
1608 logger.warning('create_server() failed to create '
1609 'socket.socket(%r, %r, %r)',
1610 af, socktype, proto, exc_info=True)
1611 continue
1612 sockets.append(sock)
1613 if reuse_address: 1613 ↛ 1618line 1613 didn't jump to line 1618 because the condition on line 1613 was always true
1614 sock.setsockopt(
1615 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1616 # Since Linux 6.12.9, SO_REUSEPORT is not allowed
1617 # on other address families than AF_INET/AF_INET6.
1618 if reuse_port and af in (socket.AF_INET, socket.AF_INET6):
1619 _set_reuseport(sock)
1620 if keep_alive: 1620 ↛ 1621line 1620 didn't jump to line 1621 because the condition on line 1620 was never true
1621 sock.setsockopt(
1622 socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)
1623 # Disable IPv4/IPv6 dual stack support (enabled by
1624 # default on Linux) which makes a single socket
1625 # listen on both address families.
1626 if (_HAS_IPv6 and
1627 af == socket.AF_INET6 and
1628 hasattr(socket, 'IPPROTO_IPV6')):
1629 sock.setsockopt(socket.IPPROTO_IPV6,
1630 socket.IPV6_V6ONLY,
1631 True)
1632 try:
1633 sock.bind(sa)
1634 except OSError as err:
1635 msg = ('error while attempting '
1636 'to bind on address %r: %s'
1637 % (sa, str(err).lower()))
1638 if err.errno == errno.EADDRNOTAVAIL: 1638 ↛ 1640line 1638 didn't jump to line 1640 because the condition on line 1638 was never true
1639 # Assume the family is not enabled (bpo-30945)
1640 sockets.pop()
1641 sock.close()
1642 if self._debug:
1643 logger.warning(msg)
1644 continue
1645 raise OSError(err.errno, msg) from None
1647 if not sockets: 1647 ↛ 1648line 1647 didn't jump to line 1648 because the condition on line 1647 was never true
1648 raise OSError('could not bind on any address out of %r'
1649 % ([info[4] for info in infos],))
1651 completed = True
1652 finally:
1653 if not completed:
1654 for sock in sockets: 1654 ↛ 1663line 1654 didn't jump to line 1663 because the loop on line 1654 didn't complete
1655 sock.close()
1656 else:
1657 if sock is None:
1658 raise ValueError('Neither host/port nor sock were specified')
1659 if sock.type != socket.SOCK_STREAM:
1660 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1661 sockets = [sock]
1663 for sock in sockets:
1664 sock.setblocking(False)
1666 server = Server(self, sockets, protocol_factory,
1667 ssl, backlog, ssl_handshake_timeout,
1668 ssl_shutdown_timeout)
1669 if start_serving:
1670 server._start_serving()
1671 # Skip one loop iteration so that all 'loop.add_reader'
1672 # go through.
1673 await tasks.sleep(0)
1675 if self._debug:
1676 logger.info("%r is serving", server)
1677 return server
1679 async def connect_accepted_socket(
1680 self, protocol_factory, sock,
1681 *, ssl=None,
1682 ssl_handshake_timeout=None,
1683 ssl_shutdown_timeout=None):
1684 if sock.type != socket.SOCK_STREAM: 1684 ↛ 1685line 1684 didn't jump to line 1685 because the condition on line 1684 was never true
1685 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1687 if ssl_handshake_timeout is not None and not ssl:
1688 raise ValueError(
1689 'ssl_handshake_timeout is only meaningful with ssl')
1691 if ssl_shutdown_timeout is not None and not ssl: 1691 ↛ 1692line 1691 didn't jump to line 1692 because the condition on line 1691 was never true
1692 raise ValueError(
1693 'ssl_shutdown_timeout is only meaningful with ssl')
1695 _check_ssl_socket(sock)
1697 transport, protocol = await self._create_connection_transport(
1698 sock, protocol_factory, ssl, '', server_side=True,
1699 ssl_handshake_timeout=ssl_handshake_timeout,
1700 ssl_shutdown_timeout=ssl_shutdown_timeout)
1701 if self._debug: 1701 ↛ 1704line 1701 didn't jump to line 1704 because the condition on line 1701 was never true
1702 # Get the socket from the transport because SSL transport closes
1703 # the old socket and creates a new SSL socket
1704 sock = transport.get_extra_info('socket')
1705 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1706 return transport, protocol
1708 async def connect_read_pipe(self, protocol_factory, pipe):
1709 protocol = protocol_factory()
1710 waiter = self.create_future()
1711 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
1713 try:
1714 await waiter
1715 except:
1716 transport.close()
1717 raise
1719 if self._debug: 1719 ↛ 1720line 1719 didn't jump to line 1720 because the condition on line 1719 was never true
1720 logger.debug('Read pipe %r connected: (%r, %r)',
1721 pipe.fileno(), transport, protocol)
1722 return transport, protocol
1724 async def connect_write_pipe(self, protocol_factory, pipe):
1725 protocol = protocol_factory()
1726 waiter = self.create_future()
1727 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
1729 try:
1730 await waiter
1731 except:
1732 transport.close()
1733 raise
1735 if self._debug: 1735 ↛ 1736line 1735 didn't jump to line 1736 because the condition on line 1735 was never true
1736 logger.debug('Write pipe %r connected: (%r, %r)',
1737 pipe.fileno(), transport, protocol)
1738 return transport, protocol
1740 def _log_subprocess(self, msg, stdin, stdout, stderr):
1741 info = [msg]
1742 if stdin is not None:
1743 info.append(f'stdin={_format_pipe(stdin)}')
1744 if stdout is not None and stderr == subprocess.STDOUT:
1745 info.append(f'stdout=stderr={_format_pipe(stdout)}')
1746 else:
1747 if stdout is not None:
1748 info.append(f'stdout={_format_pipe(stdout)}')
1749 if stderr is not None:
1750 info.append(f'stderr={_format_pipe(stderr)}')
1751 logger.debug(' '.join(info))
1753 async def subprocess_shell(self, protocol_factory, cmd, *,
1754 stdin=subprocess.PIPE,
1755 stdout=subprocess.PIPE,
1756 stderr=subprocess.PIPE,
1757 universal_newlines=False,
1758 shell=True, bufsize=0,
1759 encoding=None, errors=None, text=None,
1760 **kwargs):
1761 if not isinstance(cmd, (bytes, str)):
1762 raise ValueError("cmd must be a string")
1763 if universal_newlines:
1764 raise ValueError("universal_newlines must be False")
1765 if not shell:
1766 raise ValueError("shell must be True")
1767 if bufsize != 0:
1768 raise ValueError("bufsize must be 0")
1769 if text:
1770 raise ValueError("text must be False")
1771 if encoding is not None:
1772 raise ValueError("encoding must be None")
1773 if errors is not None:
1774 raise ValueError("errors must be None")
1776 protocol = protocol_factory()
1777 debug_log = None
1778 if self._debug: 1778 ↛ 1781line 1778 didn't jump to line 1781 because the condition on line 1778 was never true
1779 # don't log parameters: they may contain sensitive information
1780 # (password) and may be too long
1781 debug_log = 'run shell command %r' % cmd
1782 self._log_subprocess(debug_log, stdin, stdout, stderr)
1783 transport = await self._make_subprocess_transport(
1784 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
1785 if self._debug and debug_log is not None: 1785 ↛ 1786line 1785 didn't jump to line 1786 because the condition on line 1785 was never true
1786 logger.info('%s: %r', debug_log, transport)
1787 return transport, protocol
1789 async def subprocess_exec(self, protocol_factory, program, *args,
1790 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1791 stderr=subprocess.PIPE, universal_newlines=False,
1792 shell=False, bufsize=0,
1793 encoding=None, errors=None, text=None,
1794 **kwargs):
1795 if universal_newlines:
1796 raise ValueError("universal_newlines must be False")
1797 if shell:
1798 raise ValueError("shell must be False")
1799 if bufsize != 0:
1800 raise ValueError("bufsize must be 0")
1801 if text:
1802 raise ValueError("text must be False")
1803 if encoding is not None:
1804 raise ValueError("encoding must be None")
1805 if errors is not None:
1806 raise ValueError("errors must be None")
1808 popen_args = (program,) + args
1809 protocol = protocol_factory()
1810 debug_log = None
1811 if self._debug: 1811 ↛ 1814line 1811 didn't jump to line 1814 because the condition on line 1811 was never true
1812 # don't log parameters: they may contain sensitive information
1813 # (password) and may be too long
1814 debug_log = f'execute program {program!r}'
1815 self._log_subprocess(debug_log, stdin, stdout, stderr)
1816 transport = await self._make_subprocess_transport(
1817 protocol, popen_args, False, stdin, stdout, stderr,
1818 bufsize, **kwargs)
1819 if self._debug and debug_log is not None: 1819 ↛ 1820line 1819 didn't jump to line 1820 because the condition on line 1819 was never true
1820 logger.info('%s: %r', debug_log, transport)
1821 return transport, protocol
1823 def get_exception_handler(self):
1824 """Return an exception handler, or None if the default one is in use.
1825 """
1826 return self._exception_handler
1828 def set_exception_handler(self, handler):
1829 """Set handler as the new event loop exception handler.
1831 If handler is None, the default exception handler will
1832 be set.
1834 If handler is a callable object, it should have a
1835 signature matching '(loop, context)', where 'loop'
1836 will be a reference to the active event loop, 'context'
1837 will be a dict object (see `call_exception_handler()`
1838 documentation for details about context).
1839 """
1840 if handler is not None and not callable(handler):
1841 raise TypeError(f'A callable object or None is expected, '
1842 f'got {handler!r}')
1843 self._exception_handler = handler
1845 def default_exception_handler(self, context):
1846 """Default exception handler.
1848 This is called when an exception occurs and no exception
1849 handler is set, and can be called by a custom exception
1850 handler that wants to defer to the default behavior.
1852 This default handler logs the error message and other
1853 context-dependent information. In debug mode, a truncated
1854 stack trace is also appended showing where the given object
1855 (e.g. a handle or future or task) was created, if any.
1857 The context parameter has the same meaning as in
1858 `call_exception_handler()`.
1859 """
1860 message = context.get('message')
1861 if not message: 1861 ↛ 1862line 1861 didn't jump to line 1862 because the condition on line 1861 was never true
1862 message = 'Unhandled exception in event loop'
1864 exception = context.get('exception')
1865 if exception is not None:
1866 exc_info = (type(exception), exception, exception.__traceback__)
1867 else:
1868 exc_info = False
1870 if ('source_traceback' not in context and 1870 ↛ 1873line 1870 didn't jump to line 1873 because the condition on line 1870 was never true
1871 self._current_handle is not None and
1872 self._current_handle._source_traceback):
1873 context['handle_traceback'] = \
1874 self._current_handle._source_traceback
1876 log_lines = [message]
1877 for key in sorted(context):
1878 if key in {'message', 'exception'}:
1879 continue
1880 value = context[key]
1881 if key == 'source_traceback':
1882 tb = ''.join(traceback.format_list(value))
1883 value = 'Object created at (most recent call last):\n'
1884 value += tb.rstrip()
1885 elif key == 'handle_traceback': 1885 ↛ 1886line 1885 didn't jump to line 1886 because the condition on line 1885 was never true
1886 tb = ''.join(traceback.format_list(value))
1887 value = 'Handle created at (most recent call last):\n'
1888 value += tb.rstrip()
1889 else:
1890 value = repr(value)
1891 log_lines.append(f'{key}: {value}')
1893 logger.error('\n'.join(log_lines), exc_info=exc_info)
1895 def call_exception_handler(self, context):
1896 """Call the current event loop's exception handler.
1898 The context argument is a dict containing the following keys:
1900 - 'message': Error message;
1901 - 'exception' (optional): Exception object;
1902 - 'future' (optional): Future instance;
1903 - 'task' (optional): Task instance;
1904 - 'handle' (optional): Handle instance;
1905 - 'protocol' (optional): Protocol instance;
1906 - 'transport' (optional): Transport instance;
1907 - 'socket' (optional): Socket instance;
1908 - 'source_traceback' (optional): Traceback of the source;
1909 - 'handle_traceback' (optional): Traceback of the handle;
1910 - 'asyncgen' (optional): Asynchronous generator that caused
1911 the exception.
1913 New keys maybe introduced in the future.
1915 Note: do not overload this method in an event loop subclass.
1916 For custom exception handling, use the
1917 `set_exception_handler()` method.
1918 """
1919 if self._exception_handler is None:
1920 try:
1921 self.default_exception_handler(context)
1922 except (SystemExit, KeyboardInterrupt):
1923 raise
1924 except BaseException:
1925 # Second protection layer for unexpected errors
1926 # in the default implementation, as well as for subclassed
1927 # event loops with overloaded "default_exception_handler".
1928 logger.error('Exception in default exception handler',
1929 exc_info=True)
1930 else:
1931 try:
1932 ctx = None
1933 thing = context.get("task")
1934 if thing is None:
1935 # Even though Futures don't have a context,
1936 # Task is a subclass of Future,
1937 # and sometimes the 'future' key holds a Task.
1938 thing = context.get("future")
1939 if thing is None:
1940 # Handles also have a context.
1941 thing = context.get("handle")
1942 if thing is not None and hasattr(thing, "get_context"):
1943 ctx = thing.get_context()
1944 if ctx is not None and hasattr(ctx, "run"):
1945 ctx.run(self._exception_handler, self, context)
1946 else:
1947 self._exception_handler(self, context)
1948 except (SystemExit, KeyboardInterrupt):
1949 raise
1950 except BaseException as exc:
1951 # Exception in the user set custom exception handler.
1952 try:
1953 # Let's try default handler.
1954 self.default_exception_handler({
1955 'message': 'Unhandled error in exception handler',
1956 'exception': exc,
1957 'context': context,
1958 })
1959 except (SystemExit, KeyboardInterrupt):
1960 raise
1961 except BaseException:
1962 # Guard 'default_exception_handler' in case it is
1963 # overloaded.
1964 logger.error('Exception in default exception handler '
1965 'while handling an unexpected error '
1966 'in custom exception handler',
1967 exc_info=True)
1969 def _add_callback(self, handle):
1970 """Add a Handle to _ready."""
1971 if not handle._cancelled:
1972 self._ready.append(handle)
1974 def _add_callback_signalsafe(self, handle):
1975 """Like _add_callback() but called from a signal handler."""
1976 self._add_callback(handle)
1977 self._write_to_self()
1979 def _timer_handle_cancelled(self, handle):
1980 """Notification that a TimerHandle has been cancelled."""
1981 if handle._scheduled:
1982 self._timer_cancelled_count += 1
1984 def _run_once(self):
1985 """Run one full iteration of the event loop.
1987 This calls all currently ready callbacks, polls for I/O,
1988 schedules the resulting callbacks, and finally schedules
1989 'call_later' callbacks.
1990 """
1992 sched_count = len(self._scheduled)
1993 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1994 self._timer_cancelled_count / sched_count >
1995 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1996 # Remove delayed calls that were cancelled if their number
1997 # is too high
1998 new_scheduled = []
1999 for handle in self._scheduled:
2000 if handle._cancelled:
2001 handle._scheduled = False
2002 else:
2003 new_scheduled.append(handle)
2005 heapq.heapify(new_scheduled)
2006 self._scheduled = new_scheduled
2007 self._timer_cancelled_count = 0
2008 else:
2009 # Remove delayed calls that were cancelled from head of queue.
2010 while self._scheduled and self._scheduled[0]._cancelled:
2011 self._timer_cancelled_count -= 1
2012 handle = heapq.heappop(self._scheduled)
2013 handle._scheduled = False
2015 timeout = None
2016 if self._ready or self._stopping:
2017 timeout = 0
2018 elif self._scheduled:
2019 # Compute the desired timeout.
2020 timeout = self._scheduled[0]._when - self.time()
2021 if timeout > MAXIMUM_SELECT_TIMEOUT: 2021 ↛ 2022line 2021 didn't jump to line 2022 because the condition on line 2021 was never true
2022 timeout = MAXIMUM_SELECT_TIMEOUT
2023 elif timeout < 0:
2024 timeout = 0
2026 event_list = self._selector.select(timeout)
2027 self._process_events(event_list)
2028 # Needed to break cycles when an exception occurs.
2029 event_list = None
2031 # Handle 'later' callbacks that are ready.
2032 now = self.time()
2033 # Ensure that `end_time` is strictly increasing
2034 # when the clock resolution is too small.
2035 end_time = now + max(self._clock_resolution, math.ulp(now))
2036 while self._scheduled:
2037 handle = self._scheduled[0]
2038 if handle._when >= end_time:
2039 break
2040 handle = heapq.heappop(self._scheduled)
2041 handle._scheduled = False
2042 self._ready.append(handle)
2044 # This is the only place where callbacks are actually *called*.
2045 # All other places just add them to ready.
2046 # Note: We run all currently scheduled callbacks, but not any
2047 # callbacks scheduled by callbacks run this time around --
2048 # they will be run the next time (after another I/O poll).
2049 # Use an idiom that is thread-safe without using locks.
2050 ntodo = len(self._ready)
2051 for i in range(ntodo):
2052 handle = self._ready.popleft()
2053 if handle._cancelled:
2054 continue
2055 if self._debug:
2056 try:
2057 self._current_handle = handle
2058 t0 = self.time()
2059 handle._run()
2060 dt = self.time() - t0
2061 if dt >= self.slow_callback_duration:
2062 logger.warning('Executing %s took %.3f seconds',
2063 _format_handle(handle), dt)
2064 finally:
2065 self._current_handle = None
2066 else:
2067 handle._run()
2068 handle = None # Needed to break cycles when an exception occurs.
2070 def _set_coroutine_origin_tracking(self, enabled):
2071 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
2072 return
2074 if enabled:
2075 self._coroutine_origin_tracking_saved_depth = (
2076 sys.get_coroutine_origin_tracking_depth())
2077 sys.set_coroutine_origin_tracking_depth(
2078 constants.DEBUG_STACK_DEPTH)
2079 else:
2080 sys.set_coroutine_origin_tracking_depth(
2081 self._coroutine_origin_tracking_saved_depth)
2083 self._coroutine_origin_tracking_enabled = enabled
2085 def get_debug(self):
2086 return self._debug
2088 def set_debug(self, enabled):
2089 self._debug = enabled
2091 if self.is_running():
2092 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)