Coverage for Lib/asyncio/base_events.py: 88%
1152 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-23 01:21 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-23 01:21 +0000
1"""Base implementation of event loop.
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of I/O events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
16import collections
17import collections.abc
18import concurrent.futures
19import errno
20import heapq
21import itertools
22import os
23import socket
24import stat
25import subprocess
26import threading
27import time
28import traceback
29import sys
30import warnings
31import weakref
33try:
34 import ssl
35except ImportError: # pragma: no cover
36 ssl = None
38from . import constants
39from . import coroutines
40from . import events
41from . import exceptions
42from . import futures
43from . import protocols
44from . import sslproto
45from . import staggered
46from . import tasks
47from . import timeouts
48from . import transports
49from . import trsock
50from .log import logger
53__all__ = 'BaseEventLoop','Server',
56# Minimum number of _scheduled timer handles before cleanup of
57# cancelled handles is performed.
58_MIN_SCHEDULED_TIMER_HANDLES = 100
60# Minimum fraction of _scheduled timer handles that are cancelled
61# before cleanup of cancelled handles is performed.
62_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
65_HAS_IPv6 = hasattr(socket, 'AF_INET6')
67# Maximum timeout passed to select to avoid OS limitations
68MAXIMUM_SELECT_TIMEOUT = 24 * 3600
71def _format_handle(handle):
72 cb = handle._callback
73 if isinstance(getattr(cb, '__self__', None), tasks.Task):
74 # format the task
75 return repr(cb.__self__)
76 else:
77 return str(handle)
80def _format_pipe(fd):
81 if fd == subprocess.PIPE:
82 return '<pipe>'
83 elif fd == subprocess.STDOUT:
84 return '<stdout>'
85 else:
86 return repr(fd)
89def _set_reuseport(sock):
90 if not hasattr(socket, 'SO_REUSEPORT'):
91 raise ValueError('reuse_port not supported by socket module')
92 else:
93 try:
94 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
95 except OSError:
96 raise ValueError('reuse_port not supported by socket module, '
97 'SO_REUSEPORT defined but not implemented.')
100def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
101 # Try to skip getaddrinfo if "host" is already an IP. Users might have
102 # handled name resolution in their own code and pass in resolved IPs.
103 if not hasattr(socket, 'inet_pton'):
104 return
106 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
107 host is None:
108 return None
110 if type == socket.SOCK_STREAM:
111 proto = socket.IPPROTO_TCP
112 elif type == socket.SOCK_DGRAM:
113 proto = socket.IPPROTO_UDP
114 else:
115 return None
117 if port is None:
118 port = 0
119 elif isinstance(port, bytes) and port == b'':
120 port = 0
121 elif isinstance(port, str) and port == '':
122 port = 0
123 else:
124 # If port's a service name like "http", don't skip getaddrinfo.
125 try:
126 port = int(port)
127 except (TypeError, ValueError):
128 return None
130 if family == socket.AF_UNSPEC:
131 afs = [socket.AF_INET]
132 if _HAS_IPv6: 132 ↛ 137line 132 didn't jump to line 137 because the condition on line 132 was always true
133 afs.append(socket.AF_INET6)
134 else:
135 afs = [family]
137 if isinstance(host, bytes):
138 host = host.decode('idna')
139 if '%' in host:
140 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
141 # like '::1%lo0'.
142 return None
144 for af in afs:
145 try:
146 socket.inet_pton(af, host)
147 # The host has already been resolved.
148 if _HAS_IPv6 and af == socket.AF_INET6:
149 return af, type, proto, '', (host, port, flowinfo, scopeid)
150 else:
151 return af, type, proto, '', (host, port)
152 except OSError:
153 pass
155 # "host" is not an IP address.
156 return None
159def _interleave_addrinfos(addrinfos, first_address_family_count=1):
160 """Interleave list of addrinfo tuples by family."""
161 # Group addresses by family
162 addrinfos_by_family = collections.OrderedDict()
163 for addr in addrinfos:
164 family = addr[0]
165 if family not in addrinfos_by_family:
166 addrinfos_by_family[family] = []
167 addrinfos_by_family[family].append(addr)
168 addrinfos_lists = list(addrinfos_by_family.values())
170 reordered = []
171 if first_address_family_count > 1:
172 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
173 del addrinfos_lists[0][:first_address_family_count - 1]
174 reordered.extend(
175 a for a in itertools.chain.from_iterable(
176 itertools.zip_longest(*addrinfos_lists)
177 ) if a is not None)
178 return reordered
181def _run_until_complete_cb(fut):
182 if not fut.cancelled():
183 exc = fut.exception()
184 if isinstance(exc, (SystemExit, KeyboardInterrupt)):
185 # Issue #22429: run_forever() already finished, no need to
186 # stop it.
187 return
188 futures._get_loop(fut).stop()
191if hasattr(socket, 'TCP_NODELAY'): 191 ↛ 198line 191 didn't jump to line 198 because the condition on line 191 was always true
192 def _set_nodelay(sock):
193 if (sock.family in {socket.AF_INET, socket.AF_INET6} and
194 sock.type == socket.SOCK_STREAM and
195 sock.proto == socket.IPPROTO_TCP):
196 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
197else:
198 def _set_nodelay(sock):
199 pass
202def _check_ssl_socket(sock):
203 if ssl is not None and isinstance(sock, ssl.SSLSocket): 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true
204 raise TypeError("Socket cannot be of type SSLSocket")
207class _SendfileFallbackProtocol(protocols.Protocol):
208 def __init__(self, transp):
209 if not isinstance(transp, transports._FlowControlMixin): 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true
210 raise TypeError("transport should be _FlowControlMixin instance")
211 self._transport = transp
212 self._proto = transp.get_protocol()
213 self._should_resume_reading = transp.is_reading()
214 self._should_resume_writing = transp._protocol_paused
215 transp.pause_reading()
216 transp.set_protocol(self)
217 if self._should_resume_writing: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true
218 self._write_ready_fut = self._transport._loop.create_future()
219 else:
220 self._write_ready_fut = None
222 async def drain(self):
223 if self._transport.is_closing(): 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true
224 raise ConnectionError("Connection closed by peer")
225 fut = self._write_ready_fut
226 if fut is None:
227 return
228 await fut
230 def connection_made(self, transport):
231 raise RuntimeError("Invalid state: "
232 "connection should have been established already.")
234 def connection_lost(self, exc):
235 if self._write_ready_fut is not None: 235 ↛ 243line 235 didn't jump to line 243 because the condition on line 235 was always true
236 # Never happens if peer disconnects after sending the whole content
237 # Thus disconnection is always an exception from user perspective
238 if exc is None: 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true
239 self._write_ready_fut.set_exception(
240 ConnectionError("Connection is closed by peer"))
241 else:
242 self._write_ready_fut.set_exception(exc)
243 self._proto.connection_lost(exc)
245 def pause_writing(self):
246 if self._write_ready_fut is not None: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true
247 return
248 self._write_ready_fut = self._transport._loop.create_future()
250 def resume_writing(self):
251 if self._write_ready_fut is None: 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true
252 return
253 self._write_ready_fut.set_result(False)
254 self._write_ready_fut = None
256 def data_received(self, data):
257 raise RuntimeError("Invalid state: reading should be paused")
259 def eof_received(self):
260 raise RuntimeError("Invalid state: reading should be paused")
262 async def restore(self):
263 self._transport.set_protocol(self._proto)
264 if self._should_resume_reading: 264 ↛ 266line 264 didn't jump to line 266 because the condition on line 264 was always true
265 self._transport.resume_reading()
266 if self._write_ready_fut is not None:
267 # Cancel the future.
268 # Basically it has no effect because protocol is switched back,
269 # no code should wait for it anymore.
270 self._write_ready_fut.cancel()
271 if self._should_resume_writing: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true
272 self._proto.resume_writing()
275class Server(events.AbstractServer):
277 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
278 ssl_handshake_timeout, ssl_shutdown_timeout=None):
279 self._loop = loop
280 self._sockets = sockets
281 # Weak references so we don't break Transport's ability to
282 # detect abandoned transports
283 self._clients = weakref.WeakSet()
284 self._waiters = []
285 self._protocol_factory = protocol_factory
286 self._backlog = backlog
287 self._ssl_context = ssl_context
288 self._ssl_handshake_timeout = ssl_handshake_timeout
289 self._ssl_shutdown_timeout = ssl_shutdown_timeout
290 self._serving = False
291 self._serving_forever_fut = None
293 def __repr__(self):
294 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
296 def _attach(self, transport):
297 assert self._sockets is not None
298 self._clients.add(transport)
300 def _detach(self, transport):
301 self._clients.discard(transport)
302 if len(self._clients) == 0 and self._sockets is None:
303 self._wakeup()
305 def _wakeup(self):
306 waiters = self._waiters
307 self._waiters = None
308 for waiter in waiters:
309 if not waiter.done(): 309 ↛ 308line 309 didn't jump to line 308 because the condition on line 309 was always true
310 waiter.set_result(None)
312 def _start_serving(self):
313 if self._serving:
314 return
315 self._serving = True
316 for sock in self._sockets:
317 sock.listen(self._backlog)
318 self._loop._start_serving(
319 self._protocol_factory, sock, self._ssl_context,
320 self, self._backlog, self._ssl_handshake_timeout,
321 self._ssl_shutdown_timeout)
323 def get_loop(self):
324 return self._loop
326 def is_serving(self):
327 return self._serving
329 @property
330 def sockets(self):
331 if self._sockets is None:
332 return ()
333 return tuple(trsock.TransportSocket(s) for s in self._sockets)
335 def close(self):
336 sockets = self._sockets
337 if sockets is None:
338 return
339 self._sockets = None
341 for sock in sockets:
342 self._loop._stop_serving(sock)
344 self._serving = False
346 if (self._serving_forever_fut is not None and 346 ↛ 348line 346 didn't jump to line 348 because the condition on line 346 was never true
347 not self._serving_forever_fut.done()):
348 self._serving_forever_fut.cancel()
349 self._serving_forever_fut = None
351 if len(self._clients) == 0:
352 self._wakeup()
354 def close_clients(self):
355 for transport in self._clients.copy():
356 transport.close()
358 def abort_clients(self):
359 for transport in self._clients.copy():
360 transport.abort()
362 async def start_serving(self):
363 self._start_serving()
364 # Skip one loop iteration so that all 'loop.add_reader'
365 # go through.
366 await tasks.sleep(0)
368 async def serve_forever(self):
369 if self._serving_forever_fut is not None: 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true
370 raise RuntimeError(
371 f'server {self!r} is already being awaited on serve_forever()')
372 if self._sockets is None:
373 raise RuntimeError(f'server {self!r} is closed')
375 self._start_serving()
376 self._serving_forever_fut = self._loop.create_future()
378 try:
379 await self._serving_forever_fut
380 except exceptions.CancelledError:
381 try:
382 self.close()
383 await self.wait_closed()
384 finally:
385 raise
386 finally:
387 self._serving_forever_fut = None
389 async def wait_closed(self):
390 """Wait until server is closed and all connections are dropped.
392 - If the server is not closed, wait.
393 - If it is closed, but there are still active connections, wait.
395 Anyone waiting here will be unblocked once both conditions
396 (server is closed and all connections have been dropped)
397 have become true, in either order.
399 Historical note: In 3.11 and before, this was broken, returning
400 immediately if the server was already closed, even if there
401 were still active connections. An attempted fix in 3.12.0 was
402 still broken, returning immediately if the server was still
403 open and there were no active connections. Hopefully in 3.12.1
404 we have it right.
405 """
406 # Waiters are unblocked by self._wakeup(), which is called
407 # from two places: self.close() and self._detach(), but only
408 # when both conditions have become true. To signal that this
409 # has happened, self._wakeup() sets self._waiters to None.
410 if self._waiters is None:
411 return
412 waiter = self._loop.create_future()
413 self._waiters.append(waiter)
414 await waiter
417class BaseEventLoop(events.AbstractEventLoop):
419 def __init__(self):
420 self._timer_cancelled_count = 0
421 self._closed = False
422 self._stopping = False
423 self._ready = collections.deque()
424 self._scheduled = []
425 self._default_executor = None
426 self._internal_fds = 0
427 # Identifier of the thread running the event loop, or None if the
428 # event loop is not running
429 self._thread_id = None
430 self._clock_resolution = time.get_clock_info('monotonic').resolution
431 self._exception_handler = None
432 self.set_debug(coroutines._is_debug_mode())
433 # The preserved state of async generator hooks.
434 self._old_agen_hooks = None
435 # In debug mode, if the execution of a callback or a step of a task
436 # exceed this duration in seconds, the slow callback/task is logged.
437 self.slow_callback_duration = 0.1
438 self._current_handle = None
439 self._task_factory = None
440 self._coroutine_origin_tracking_enabled = False
441 self._coroutine_origin_tracking_saved_depth = None
443 # A weak set of all asynchronous generators that are
444 # being iterated by the loop.
445 self._asyncgens = weakref.WeakSet()
446 # Set to True when `loop.shutdown_asyncgens` is called.
447 self._asyncgens_shutdown_called = False
448 # Set to True when `loop.shutdown_default_executor` is called.
449 self._executor_shutdown_called = False
451 def __repr__(self):
452 return (
453 f'<{self.__class__.__name__} running={self.is_running()} '
454 f'closed={self.is_closed()} debug={self.get_debug()}>'
455 )
457 def create_future(self):
458 """Create a Future object attached to the loop."""
459 return futures.Future(loop=self)
461 def create_task(self, coro, **kwargs):
462 """Schedule or begin executing a coroutine object.
464 Return a task object.
465 """
466 self._check_closed()
467 if self._task_factory is not None:
468 return self._task_factory(self, coro, **kwargs)
470 task = tasks.Task(coro, loop=self, **kwargs)
471 if task._source_traceback:
472 del task._source_traceback[-1]
473 try:
474 return task
475 finally:
476 # gh-128552: prevent a refcycle of
477 # task.exception().__traceback__->BaseEventLoop.create_task->task
478 del task
480 def set_task_factory(self, factory):
481 """Set a task factory that will be used by loop.create_task().
483 If factory is None the default task factory will be set.
485 If factory is a callable, it should have a signature matching
486 '(loop, coro, **kwargs)', where 'loop' will be a reference to the active
487 event loop, 'coro' will be a coroutine object, and **kwargs will be
488 arbitrary keyword arguments that should be passed on to Task.
489 The callable must return a Task.
490 """
491 if factory is not None and not callable(factory):
492 raise TypeError('task factory must be a callable or None')
493 self._task_factory = factory
495 def get_task_factory(self):
496 """Return a task factory, or None if the default one is in use."""
497 return self._task_factory
499 def _make_socket_transport(self, sock, protocol, waiter=None, *,
500 extra=None, server=None):
501 """Create socket transport."""
502 raise NotImplementedError
504 def _make_ssl_transport(
505 self, rawsock, protocol, sslcontext, waiter=None,
506 *, server_side=False, server_hostname=None,
507 extra=None, server=None,
508 ssl_handshake_timeout=None,
509 ssl_shutdown_timeout=None,
510 call_connection_made=True):
511 """Create SSL transport."""
512 raise NotImplementedError
514 def _make_datagram_transport(self, sock, protocol,
515 address=None, waiter=None, extra=None):
516 """Create datagram transport."""
517 raise NotImplementedError
519 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
520 extra=None):
521 """Create read pipe transport."""
522 raise NotImplementedError
524 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
525 extra=None):
526 """Create write pipe transport."""
527 raise NotImplementedError
529 async def _make_subprocess_transport(self, protocol, args, shell,
530 stdin, stdout, stderr, bufsize,
531 extra=None, **kwargs):
532 """Create subprocess transport."""
533 raise NotImplementedError
535 def _write_to_self(self):
536 """Write a byte to self-pipe, to wake up the event loop.
538 This may be called from a different thread.
540 The subclass is responsible for implementing the self-pipe.
541 """
542 raise NotImplementedError
544 def _process_events(self, event_list):
545 """Process selector events."""
546 raise NotImplementedError
548 def _check_closed(self):
549 if self._closed:
550 raise RuntimeError('Event loop is closed')
552 def _check_default_executor(self):
553 if self._executor_shutdown_called: 553 ↛ 554line 553 didn't jump to line 554 because the condition on line 553 was never true
554 raise RuntimeError('Executor shutdown has been called')
556 def _asyncgen_finalizer_hook(self, agen):
557 self._asyncgens.discard(agen)
558 if not self.is_closed(): 558 ↛ exitline 558 didn't return from function '_asyncgen_finalizer_hook' because the condition on line 558 was always true
559 self.call_soon_threadsafe(self.create_task, agen.aclose())
561 def _asyncgen_firstiter_hook(self, agen):
562 if self._asyncgens_shutdown_called: 562 ↛ 563line 562 didn't jump to line 563 because the condition on line 562 was never true
563 warnings.warn(
564 f"asynchronous generator {agen!r} was scheduled after "
565 f"loop.shutdown_asyncgens() call",
566 ResourceWarning, source=self)
568 self._asyncgens.add(agen)
570 async def shutdown_asyncgens(self):
571 """Shutdown all active asynchronous generators."""
572 self._asyncgens_shutdown_called = True
574 if not len(self._asyncgens):
575 # If Python version is <3.6 or we don't have any asynchronous
576 # generators alive.
577 return
579 closing_agens = list(self._asyncgens)
580 self._asyncgens.clear()
582 results = await tasks.gather(
583 *[ag.aclose() for ag in closing_agens],
584 return_exceptions=True)
586 for result, agen in zip(results, closing_agens):
587 if isinstance(result, Exception): 587 ↛ 588line 587 didn't jump to line 588 because the condition on line 587 was never true
588 self.call_exception_handler({
589 'message': f'an error occurred during closing of '
590 f'asynchronous generator {agen!r}',
591 'exception': result,
592 'asyncgen': agen
593 })
595 async def shutdown_default_executor(self, timeout=None):
596 """Schedule the shutdown of the default executor.
598 The timeout parameter specifies the amount of time the executor will
599 be given to finish joining. The default value is None, which means
600 that the executor will be given an unlimited amount of time.
601 """
602 self._executor_shutdown_called = True
603 if self._default_executor is None:
604 return
605 future = self.create_future()
606 thread = threading.Thread(target=self._do_shutdown, args=(future,))
607 thread.start()
608 try:
609 async with timeouts.timeout(timeout):
610 await future
611 except TimeoutError:
612 warnings.warn("The executor did not finishing joining "
613 f"its threads within {timeout} seconds.",
614 RuntimeWarning, stacklevel=2)
615 self._default_executor.shutdown(wait=False)
616 else:
617 thread.join()
619 def _do_shutdown(self, future):
620 try:
621 self._default_executor.shutdown(wait=True)
622 if not self.is_closed(): 622 ↛ exitline 622 didn't return from function '_do_shutdown' because the condition on line 622 was always true
623 self.call_soon_threadsafe(futures._set_result_unless_cancelled,
624 future, None)
625 except Exception as ex:
626 if not self.is_closed() and not future.cancelled():
627 self.call_soon_threadsafe(future.set_exception, ex)
629 def _check_running(self):
630 if self.is_running():
631 raise RuntimeError('This event loop is already running')
632 if events._get_running_loop() is not None:
633 raise RuntimeError(
634 'Cannot run the event loop while another loop is running')
636 def _run_forever_setup(self):
637 """Prepare the run loop to process events.
639 This method exists so that custom event loop subclasses (e.g., event loops
640 that integrate a GUI event loop with Python's event loop) have access to all the
641 loop setup logic.
642 """
643 self._check_closed()
644 self._check_running()
645 self._set_coroutine_origin_tracking(self._debug)
647 self._old_agen_hooks = sys.get_asyncgen_hooks()
648 self._thread_id = threading.get_ident()
649 sys.set_asyncgen_hooks(
650 firstiter=self._asyncgen_firstiter_hook,
651 finalizer=self._asyncgen_finalizer_hook
652 )
654 events._set_running_loop(self)
656 def _run_forever_cleanup(self):
657 """Clean up after an event loop finishes the looping over events.
659 This method exists so that custom event loop subclasses (e.g., event loops
660 that integrate a GUI event loop with Python's event loop) have access to all the
661 loop cleanup logic.
662 """
663 self._stopping = False
664 self._thread_id = None
665 events._set_running_loop(None)
666 self._set_coroutine_origin_tracking(False)
667 # Restore any pre-existing async generator hooks.
668 if self._old_agen_hooks is not None: 668 ↛ exitline 668 didn't return from function '_run_forever_cleanup' because the condition on line 668 was always true
669 sys.set_asyncgen_hooks(*self._old_agen_hooks)
670 self._old_agen_hooks = None
672 def run_forever(self):
673 """Run until stop() is called."""
674 self._run_forever_setup()
675 try:
676 while True:
677 self._run_once()
678 if self._stopping:
679 break
680 finally:
681 self._run_forever_cleanup()
683 def run_until_complete(self, future):
684 """Run until the Future is done.
686 If the argument is a coroutine, it is wrapped in a Task.
688 WARNING: It would be disastrous to call run_until_complete()
689 with the same coroutine twice -- it would wrap it in two
690 different Tasks and that can't be good.
692 Return the Future's result, or raise its exception.
693 """
694 self._check_closed()
695 self._check_running()
697 new_task = not futures.isfuture(future)
698 future = tasks.ensure_future(future, loop=self)
699 if new_task:
700 # An exception is raised if the future didn't complete, so there
701 # is no need to log the "destroy pending task" message
702 future._log_destroy_pending = False
704 future.add_done_callback(_run_until_complete_cb)
705 try:
706 self.run_forever()
707 except:
708 if new_task and future.done() and not future.cancelled():
709 # The coroutine raised a BaseException. Consume the exception
710 # to not log a warning, the caller doesn't have access to the
711 # local task.
712 future.exception()
713 raise
714 finally:
715 future.remove_done_callback(_run_until_complete_cb)
716 if not future.done():
717 raise RuntimeError('Event loop stopped before Future completed.')
719 return future.result()
721 def stop(self):
722 """Stop running the event loop.
724 Every callback already scheduled will still run. This simply informs
725 run_forever to stop looping after a complete iteration.
726 """
727 self._stopping = True
729 def close(self):
730 """Close the event loop.
732 This clears the queues and shuts down the executor,
733 but does not wait for the executor to finish.
735 The event loop must not be running.
736 """
737 if self.is_running(): 737 ↛ 738line 737 didn't jump to line 738 because the condition on line 737 was never true
738 raise RuntimeError("Cannot close a running event loop")
739 if self._closed:
740 return
741 if self._debug:
742 logger.debug("Close %r", self)
743 self._closed = True
744 self._ready.clear()
745 self._scheduled.clear()
746 self._executor_shutdown_called = True
747 executor = self._default_executor
748 if executor is not None:
749 self._default_executor = None
750 executor.shutdown(wait=False)
752 def is_closed(self):
753 """Returns True if the event loop was closed."""
754 return self._closed
756 def __del__(self, _warn=warnings.warn):
757 if not self.is_closed(): 757 ↛ 758line 757 didn't jump to line 758 because the condition on line 757 was never true
758 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
759 if not self.is_running():
760 self.close()
762 def is_running(self):
763 """Returns True if the event loop is running."""
764 return (self._thread_id is not None)
766 def time(self):
767 """Return the time according to the event loop's clock.
769 This is a float expressed in seconds since an epoch, but the
770 epoch, precision, accuracy and drift are unspecified and may
771 differ per event loop.
772 """
773 return time.monotonic()
775 def call_later(self, delay, callback, *args, context=None):
776 """Arrange for a callback to be called at a given time.
778 Return a Handle: an opaque object with a cancel() method that
779 can be used to cancel the call.
781 The delay can be an int or float, expressed in seconds. It is
782 always relative to the current time.
784 Each callback will be called exactly once. If two callbacks
785 are scheduled for exactly the same time, it is undefined which
786 will be called first.
788 Any positional arguments after the callback will be passed to
789 the callback when it is called.
790 """
791 if delay is None:
792 raise TypeError('delay must not be None')
793 timer = self.call_at(self.time() + delay, callback, *args,
794 context=context)
795 if timer._source_traceback:
796 del timer._source_traceback[-1]
797 return timer
799 def call_at(self, when, callback, *args, context=None):
800 """Like call_later(), but uses an absolute time.
802 Absolute time corresponds to the event loop's time() method.
803 """
804 if when is None:
805 raise TypeError("when cannot be None")
806 self._check_closed()
807 if self._debug:
808 self._check_thread()
809 self._check_callback(callback, 'call_at')
810 timer = events.TimerHandle(when, callback, args, self, context)
811 if timer._source_traceback:
812 del timer._source_traceback[-1]
813 heapq.heappush(self._scheduled, timer)
814 timer._scheduled = True
815 return timer
817 def call_soon(self, callback, *args, context=None):
818 """Arrange for a callback to be called as soon as possible.
820 This operates as a FIFO queue: callbacks are called in the
821 order in which they are registered. Each callback will be
822 called exactly once.
824 Any positional arguments after the callback will be passed to
825 the callback when it is called.
826 """
827 self._check_closed()
828 if self._debug:
829 self._check_thread()
830 self._check_callback(callback, 'call_soon')
831 handle = self._call_soon(callback, args, context)
832 if handle._source_traceback:
833 del handle._source_traceback[-1]
834 return handle
836 def _check_callback(self, callback, method):
837 if (coroutines.iscoroutine(callback) or
838 coroutines._iscoroutinefunction(callback)):
839 raise TypeError(
840 f"coroutines cannot be used with {method}()")
841 if not callable(callback):
842 raise TypeError(
843 f'a callable object was expected by {method}(), '
844 f'got {callback!r}')
846 def _call_soon(self, callback, args, context):
847 handle = events.Handle(callback, args, self, context)
848 if handle._source_traceback:
849 del handle._source_traceback[-1]
850 self._ready.append(handle)
851 return handle
853 def _check_thread(self):
854 """Check that the current thread is the thread running the event loop.
856 Non-thread-safe methods of this class make this assumption and will
857 likely behave incorrectly when the assumption is violated.
859 Should only be called when (self._debug == True). The caller is
860 responsible for checking this condition for performance reasons.
861 """
862 if self._thread_id is None:
863 return
864 thread_id = threading.get_ident()
865 if thread_id != self._thread_id:
866 raise RuntimeError(
867 "Non-thread-safe operation invoked on an event loop other "
868 "than the current one")
870 def call_soon_threadsafe(self, callback, *args, context=None):
871 """Like call_soon(), but thread-safe."""
872 self._check_closed()
873 if self._debug:
874 self._check_callback(callback, 'call_soon_threadsafe')
875 handle = events._ThreadSafeHandle(callback, args, self, context)
876 self._ready.append(handle)
877 if handle._source_traceback:
878 del handle._source_traceback[-1]
879 if handle._source_traceback:
880 del handle._source_traceback[-1]
881 self._write_to_self()
882 return handle
884 def run_in_executor(self, executor, func, *args):
885 self._check_closed()
886 if self._debug:
887 self._check_callback(func, 'run_in_executor')
888 if executor is None:
889 executor = self._default_executor
890 # Only check when the default executor is being used
891 self._check_default_executor()
892 if executor is None:
893 executor = concurrent.futures.ThreadPoolExecutor(
894 thread_name_prefix='asyncio'
895 )
896 self._default_executor = executor
897 return futures.wrap_future(
898 executor.submit(func, *args), loop=self)
900 def set_default_executor(self, executor):
901 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
902 raise TypeError('executor must be ThreadPoolExecutor instance')
903 self._default_executor = executor
905 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
906 msg = [f"{host}:{port!r}"]
907 if family:
908 msg.append(f'family={family!r}')
909 if type:
910 msg.append(f'type={type!r}')
911 if proto:
912 msg.append(f'proto={proto!r}')
913 if flags:
914 msg.append(f'flags={flags!r}')
915 msg = ', '.join(msg)
916 logger.debug('Get address info %s', msg)
918 t0 = self.time()
919 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
920 dt = self.time() - t0
922 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
923 if dt >= self.slow_callback_duration:
924 logger.info(msg)
925 else:
926 logger.debug(msg)
927 return addrinfo
929 async def getaddrinfo(self, host, port, *,
930 family=0, type=0, proto=0, flags=0):
931 if self._debug: 931 ↛ 932line 931 didn't jump to line 932 because the condition on line 931 was never true
932 getaddr_func = self._getaddrinfo_debug
933 else:
934 getaddr_func = socket.getaddrinfo
936 return await self.run_in_executor(
937 None, getaddr_func, host, port, family, type, proto, flags)
939 async def getnameinfo(self, sockaddr, flags=0):
940 return await self.run_in_executor(
941 None, socket.getnameinfo, sockaddr, flags)
943 async def sock_sendfile(self, sock, file, offset=0, count=None,
944 *, fallback=True):
945 if self._debug and sock.gettimeout() != 0:
946 raise ValueError("the socket must be non-blocking")
947 _check_ssl_socket(sock)
948 self._check_sendfile_params(sock, file, offset, count)
949 try:
950 return await self._sock_sendfile_native(sock, file,
951 offset, count)
952 except exceptions.SendfileNotAvailableError as exc:
953 if not fallback:
954 raise
955 return await self._sock_sendfile_fallback(sock, file,
956 offset, count)
958 async def _sock_sendfile_native(self, sock, file, offset, count):
959 # NB: sendfile syscall is not supported for SSL sockets and
960 # non-mmap files even if sendfile is supported by OS
961 raise exceptions.SendfileNotAvailableError(
962 f"syscall sendfile is not available for socket {sock!r} "
963 f"and file {file!r} combination")
965 async def _sock_sendfile_fallback(self, sock, file, offset, count):
966 if offset:
967 file.seek(offset)
968 blocksize = (
969 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
970 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
971 )
972 buf = bytearray(blocksize)
973 total_sent = 0
974 try:
975 while True:
976 if count:
977 blocksize = min(count - total_sent, blocksize)
978 if blocksize <= 0:
979 break
980 view = memoryview(buf)[:blocksize]
981 read = await self.run_in_executor(None, file.readinto, view)
982 if not read:
983 break # EOF
984 await self.sock_sendall(sock, view[:read])
985 total_sent += read
986 return total_sent
987 finally:
988 if total_sent > 0 and hasattr(file, 'seek'):
989 file.seek(offset + total_sent)
991 def _check_sendfile_params(self, sock, file, offset, count):
992 if 'b' not in getattr(file, 'mode', 'b'):
993 raise ValueError("file should be opened in binary mode")
994 if not sock.type == socket.SOCK_STREAM:
995 raise ValueError("only SOCK_STREAM type sockets are supported")
996 if count is not None:
997 if not isinstance(count, int):
998 raise TypeError(
999 "count must be a positive integer (got {!r})".format(count))
1000 if count <= 0:
1001 raise ValueError(
1002 "count must be a positive integer (got {!r})".format(count))
1003 if not isinstance(offset, int):
1004 raise TypeError(
1005 "offset must be a non-negative integer (got {!r})".format(
1006 offset))
1007 if offset < 0:
1008 raise ValueError(
1009 "offset must be a non-negative integer (got {!r})".format(
1010 offset))
1012 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
1013 """Create, bind and connect one socket."""
1014 my_exceptions = []
1015 exceptions.append(my_exceptions)
1016 family, type_, proto, _, address = addr_info
1017 sock = None
1018 try:
1019 try:
1020 sock = socket.socket(family=family, type=type_, proto=proto)
1021 sock.setblocking(False)
1022 if local_addr_infos is not None:
1023 for lfamily, _, _, _, laddr in local_addr_infos:
1024 # skip local addresses of different family
1025 if lfamily != family:
1026 continue
1027 try:
1028 sock.bind(laddr)
1029 break
1030 except OSError as exc:
1031 msg = (
1032 f'error while attempting to bind on '
1033 f'address {laddr!r}: {str(exc).lower()}'
1034 )
1035 exc = OSError(exc.errno, msg)
1036 my_exceptions.append(exc)
1037 else: # all bind attempts failed
1038 if my_exceptions:
1039 raise my_exceptions.pop()
1040 else:
1041 raise OSError(f"no matching local address with {family=} found")
1042 await self.sock_connect(sock, address)
1043 return sock
1044 except OSError as exc:
1045 my_exceptions.append(exc)
1046 raise
1047 except:
1048 if sock is not None:
1049 try:
1050 sock.close()
1051 except OSError:
1052 # An error when closing a newly created socket is
1053 # not important, but it can overwrite more important
1054 # non-OSError error. So ignore it.
1055 pass
1056 raise
1057 finally:
1058 exceptions = my_exceptions = None
1060 async def create_connection(
1061 self, protocol_factory, host=None, port=None,
1062 *, ssl=None, family=0,
1063 proto=0, flags=0, sock=None,
1064 local_addr=None, server_hostname=None,
1065 ssl_handshake_timeout=None,
1066 ssl_shutdown_timeout=None,
1067 happy_eyeballs_delay=None, interleave=None,
1068 all_errors=False):
1069 """Connect to a TCP server.
1071 Create a streaming transport connection to a given internet host and
1072 port: socket family AF_INET or socket.AF_INET6 depending on host (or
1073 family if specified), socket type SOCK_STREAM. protocol_factory must be
1074 a callable returning a protocol instance.
1076 This method is a coroutine which will try to establish the connection
1077 in the background. When successful, the coroutine returns a
1078 (transport, protocol) pair.
1079 """
1080 if server_hostname is not None and not ssl:
1081 raise ValueError('server_hostname is only meaningful with ssl')
1083 if server_hostname is None and ssl:
1084 # Use host as default for server_hostname. It is an error
1085 # if host is empty or not set, e.g. when an
1086 # already-connected socket was passed or when only a port
1087 # is given. To avoid this error, you can pass
1088 # server_hostname='' -- this will bypass the hostname
1089 # check. (This also means that if host is a numeric
1090 # IP/IPv6 address, we will attempt to verify that exact
1091 # address; this will probably fail, but it is possible to
1092 # create a certificate for a specific IP address, so we
1093 # don't judge it here.)
1094 if not host:
1095 raise ValueError('You must set server_hostname '
1096 'when using ssl without a host')
1097 server_hostname = host
1099 if ssl_handshake_timeout is not None and not ssl:
1100 raise ValueError(
1101 'ssl_handshake_timeout is only meaningful with ssl')
1103 if ssl_shutdown_timeout is not None and not ssl: 1103 ↛ 1104line 1103 didn't jump to line 1104 because the condition on line 1103 was never true
1104 raise ValueError(
1105 'ssl_shutdown_timeout is only meaningful with ssl')
1107 if sock is not None:
1108 _check_ssl_socket(sock)
1110 if happy_eyeballs_delay is not None and interleave is None:
1111 # If using happy eyeballs, default to interleave addresses by family
1112 interleave = 1
1114 if host is not None or port is not None:
1115 if sock is not None:
1116 raise ValueError(
1117 'host/port and sock can not be specified at the same time')
1119 infos = await self._ensure_resolved(
1120 (host, port), family=family,
1121 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
1122 if not infos:
1123 raise OSError('getaddrinfo() returned empty list')
1125 if local_addr is not None:
1126 laddr_infos = await self._ensure_resolved(
1127 local_addr, family=family,
1128 type=socket.SOCK_STREAM, proto=proto,
1129 flags=flags, loop=self)
1130 if not laddr_infos:
1131 raise OSError('getaddrinfo() returned empty list')
1132 else:
1133 laddr_infos = None
1135 if interleave:
1136 infos = _interleave_addrinfos(infos, interleave)
1138 exceptions = []
1139 if happy_eyeballs_delay is None:
1140 # not using happy eyeballs
1141 for addrinfo in infos:
1142 try:
1143 sock = await self._connect_sock(
1144 exceptions, addrinfo, laddr_infos)
1145 break
1146 except OSError:
1147 continue
1148 else: # using happy eyeballs
1149 sock = (await staggered.staggered_race(
1150 (
1151 # can't use functools.partial as it keeps a reference
1152 # to exceptions
1153 lambda addrinfo=addrinfo: self._connect_sock(
1154 exceptions, addrinfo, laddr_infos
1155 )
1156 for addrinfo in infos
1157 ),
1158 happy_eyeballs_delay,
1159 loop=self,
1160 ))[0] # can't use sock, _, _ as it keeks a reference to exceptions
1162 if sock is None:
1163 exceptions = [exc for sub in exceptions for exc in sub]
1164 try:
1165 if all_errors:
1166 raise ExceptionGroup("create_connection failed", exceptions)
1167 if len(exceptions) == 1:
1168 raise exceptions[0]
1169 elif exceptions:
1170 # If they all have the same str(), raise one.
1171 model = str(exceptions[0])
1172 if all(str(exc) == model for exc in exceptions):
1173 raise exceptions[0]
1174 # Raise a combined exception so the user can see all
1175 # the various error messages.
1176 raise OSError('Multiple exceptions: {}'.format(
1177 ', '.join(str(exc) for exc in exceptions)))
1178 else:
1179 # No exceptions were collected, raise a timeout error
1180 raise TimeoutError('create_connection failed')
1181 finally:
1182 exceptions = None
1184 else:
1185 if sock is None:
1186 raise ValueError(
1187 'host and port was not specified and no sock specified')
1188 if sock.type != socket.SOCK_STREAM:
1189 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1190 # are SOCK_STREAM.
1191 # We support passing AF_UNIX sockets even though we have
1192 # a dedicated API for that: create_unix_connection.
1193 # Disallowing AF_UNIX in this method, breaks backwards
1194 # compatibility.
1195 raise ValueError(
1196 f'A Stream Socket was expected, got {sock!r}')
1198 transport, protocol = await self._create_connection_transport(
1199 sock, protocol_factory, ssl, server_hostname,
1200 ssl_handshake_timeout=ssl_handshake_timeout,
1201 ssl_shutdown_timeout=ssl_shutdown_timeout)
1202 if self._debug:
1203 # Get the socket from the transport because SSL transport closes
1204 # the old socket and creates a new SSL socket
1205 sock = transport.get_extra_info('socket')
1206 logger.debug("%r connected to %s:%r: (%r, %r)",
1207 sock, host, port, transport, protocol)
1208 return transport, protocol
1210 async def _create_connection_transport(
1211 self, sock, protocol_factory, ssl,
1212 server_hostname, server_side=False,
1213 ssl_handshake_timeout=None,
1214 ssl_shutdown_timeout=None):
1216 sock.setblocking(False)
1218 protocol = protocol_factory()
1219 waiter = self.create_future()
1220 if ssl:
1221 sslcontext = None if isinstance(ssl, bool) else ssl
1222 transport = self._make_ssl_transport(
1223 sock, protocol, sslcontext, waiter,
1224 server_side=server_side, server_hostname=server_hostname,
1225 ssl_handshake_timeout=ssl_handshake_timeout,
1226 ssl_shutdown_timeout=ssl_shutdown_timeout)
1227 else:
1228 transport = self._make_socket_transport(sock, protocol, waiter)
1230 try:
1231 await waiter
1232 except:
1233 transport.close()
1234 raise
1236 return transport, protocol
1238 async def sendfile(self, transport, file, offset=0, count=None,
1239 *, fallback=True):
1240 """Send a file to transport.
1242 Return the total number of bytes which were sent.
1244 The method uses high-performance os.sendfile if available.
1246 file must be a regular file object opened in binary mode.
1248 offset tells from where to start reading the file. If specified,
1249 count is the total number of bytes to transmit as opposed to
1250 sending the file until EOF is reached. File position is updated on
1251 return or also in case of error in which case file.tell()
1252 can be used to figure out the number of bytes
1253 which were sent.
1255 fallback set to True makes asyncio to manually read and send
1256 the file when the platform does not support the sendfile syscall
1257 (e.g. Windows or SSL socket on Unix).
1259 Raise SendfileNotAvailableError if the system does not support
1260 sendfile syscall and fallback is False.
1261 """
1262 if transport.is_closing():
1263 raise RuntimeError("Transport is closing")
1264 mode = getattr(transport, '_sendfile_compatible',
1265 constants._SendfileMode.UNSUPPORTED)
1266 if mode is constants._SendfileMode.UNSUPPORTED:
1267 raise RuntimeError(
1268 f"sendfile is not supported for transport {transport!r}")
1269 if mode is constants._SendfileMode.TRY_NATIVE:
1270 try:
1271 return await self._sendfile_native(transport, file,
1272 offset, count)
1273 except exceptions.SendfileNotAvailableError as exc:
1274 if not fallback:
1275 raise
1277 if not fallback:
1278 raise RuntimeError(
1279 f"fallback is disabled and native sendfile is not "
1280 f"supported for transport {transport!r}")
1282 return await self._sendfile_fallback(transport, file,
1283 offset, count)
1285 async def _sendfile_native(self, transp, file, offset, count):
1286 raise exceptions.SendfileNotAvailableError(
1287 "sendfile syscall is not supported")
1289 async def _sendfile_fallback(self, transp, file, offset, count):
1290 if offset:
1291 file.seek(offset)
1292 blocksize = min(count, 16384) if count else 16384
1293 buf = bytearray(blocksize)
1294 total_sent = 0
1295 proto = _SendfileFallbackProtocol(transp)
1296 try:
1297 while True:
1298 if count:
1299 blocksize = min(count - total_sent, blocksize)
1300 if blocksize <= 0:
1301 return total_sent
1302 view = memoryview(buf)[:blocksize]
1303 read = await self.run_in_executor(None, file.readinto, view)
1304 if not read:
1305 return total_sent # EOF
1306 transp.write(view[:read])
1307 await proto.drain()
1308 total_sent += read
1309 finally:
1310 if total_sent > 0 and hasattr(file, 'seek'):
1311 file.seek(offset + total_sent)
1312 await proto.restore()
1314 async def start_tls(self, transport, protocol, sslcontext, *,
1315 server_side=False,
1316 server_hostname=None,
1317 ssl_handshake_timeout=None,
1318 ssl_shutdown_timeout=None):
1319 """Upgrade transport to TLS.
1321 Return a new transport that *protocol* should start using
1322 immediately.
1323 """
1324 if ssl is None: 1324 ↛ 1325line 1324 didn't jump to line 1325 because the condition on line 1324 was never true
1325 raise RuntimeError('Python ssl module is not available')
1327 if not isinstance(sslcontext, ssl.SSLContext):
1328 raise TypeError(
1329 f'sslcontext is expected to be an instance of ssl.SSLContext, '
1330 f'got {sslcontext!r}')
1332 if not getattr(transport, '_start_tls_compatible', False):
1333 raise TypeError(
1334 f'transport {transport!r} is not supported by start_tls()')
1336 waiter = self.create_future()
1337 ssl_protocol = sslproto.SSLProtocol(
1338 self, protocol, sslcontext, waiter,
1339 server_side, server_hostname,
1340 ssl_handshake_timeout=ssl_handshake_timeout,
1341 ssl_shutdown_timeout=ssl_shutdown_timeout,
1342 call_connection_made=False)
1344 # Pause early so that "ssl_protocol.data_received()" doesn't
1345 # have a chance to get called before "ssl_protocol.connection_made()".
1346 transport.pause_reading()
1348 transport.set_protocol(ssl_protocol)
1349 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1350 resume_cb = self.call_soon(transport.resume_reading)
1352 try:
1353 await waiter
1354 except BaseException:
1355 transport.close()
1356 conmade_cb.cancel()
1357 resume_cb.cancel()
1358 raise
1360 return ssl_protocol._app_transport
1362 async def create_datagram_endpoint(self, protocol_factory,
1363 local_addr=None, remote_addr=None, *,
1364 family=0, proto=0, flags=0,
1365 reuse_port=None,
1366 allow_broadcast=None, sock=None):
1367 """Create datagram connection."""
1368 if sock is not None:
1369 if sock.type == socket.SOCK_STREAM:
1370 raise ValueError(
1371 f'A datagram socket was expected, got {sock!r}')
1372 if (local_addr or remote_addr or
1373 family or proto or flags or
1374 reuse_port or allow_broadcast):
1375 # show the problematic kwargs in exception msg
1376 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1377 family=family, proto=proto, flags=flags,
1378 reuse_port=reuse_port,
1379 allow_broadcast=allow_broadcast)
1380 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
1381 raise ValueError(
1382 f'socket modifier keyword arguments can not be used '
1383 f'when sock is specified. ({problems})')
1384 sock.setblocking(False)
1385 r_addr = None
1386 else:
1387 if not (local_addr or remote_addr):
1388 if family == 0:
1389 raise ValueError('unexpected address family')
1390 addr_pairs_info = (((family, proto), (None, None)),)
1391 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1392 for addr in (local_addr, remote_addr):
1393 if addr is not None and not isinstance(addr, str): 1393 ↛ 1394line 1393 didn't jump to line 1394 because the condition on line 1393 was never true
1394 raise TypeError('string is expected')
1396 if local_addr and local_addr[0] not in (0, '\x00'): 1396 ↛ 1408line 1396 didn't jump to line 1408 because the condition on line 1396 was always true
1397 try:
1398 if stat.S_ISSOCK(os.stat(local_addr).st_mode): 1398 ↛ 1408line 1398 didn't jump to line 1408 because the condition on line 1398 was always true
1399 os.remove(local_addr)
1400 except FileNotFoundError:
1401 pass
1402 except OSError as err:
1403 # Directory may have permissions only to create socket.
1404 logger.error('Unable to check or remove stale UNIX '
1405 'socket %r: %r',
1406 local_addr, err)
1408 addr_pairs_info = (((family, proto),
1409 (local_addr, remote_addr)), )
1410 else:
1411 # join address by (family, protocol)
1412 addr_infos = {} # Using order preserving dict
1413 for idx, addr in ((0, local_addr), (1, remote_addr)):
1414 if addr is not None:
1415 if not (isinstance(addr, tuple) and len(addr) == 2):
1416 raise TypeError('2-tuple is expected')
1418 infos = await self._ensure_resolved(
1419 addr, family=family, type=socket.SOCK_DGRAM,
1420 proto=proto, flags=flags, loop=self)
1421 if not infos:
1422 raise OSError('getaddrinfo() returned empty list')
1424 for fam, _, pro, _, address in infos:
1425 key = (fam, pro)
1426 if key not in addr_infos: 1426 ↛ 1428line 1426 didn't jump to line 1428 because the condition on line 1426 was always true
1427 addr_infos[key] = [None, None]
1428 addr_infos[key][idx] = address
1430 # each addr has to have info for each (family, proto) pair
1431 addr_pairs_info = [
1432 (key, addr_pair) for key, addr_pair in addr_infos.items()
1433 if not ((local_addr and addr_pair[0] is None) or
1434 (remote_addr and addr_pair[1] is None))]
1436 if not addr_pairs_info:
1437 raise ValueError('can not get address information')
1439 exceptions = []
1441 for ((family, proto), 1441 ↛ 1443line 1441 didn't jump to line 1443 because the loop on line 1441 never started
1442 (local_address, remote_address)) in addr_pairs_info:
1443 sock = None
1444 r_addr = None
1445 try:
1446 sock = socket.socket(
1447 family=family, type=socket.SOCK_DGRAM, proto=proto)
1448 if reuse_port:
1449 _set_reuseport(sock)
1450 if allow_broadcast:
1451 sock.setsockopt(
1452 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1453 sock.setblocking(False)
1455 if local_addr:
1456 sock.bind(local_address)
1457 if remote_addr:
1458 if not allow_broadcast:
1459 await self.sock_connect(sock, remote_address)
1460 r_addr = remote_address
1461 except OSError as exc:
1462 if sock is not None:
1463 sock.close()
1464 exceptions.append(exc)
1465 except:
1466 if sock is not None: 1466 ↛ 1468line 1466 didn't jump to line 1468 because the condition on line 1466 was always true
1467 sock.close()
1468 raise
1469 else:
1470 break
1471 else:
1472 raise exceptions[0]
1474 protocol = protocol_factory()
1475 waiter = self.create_future()
1476 transport = self._make_datagram_transport(
1477 sock, protocol, r_addr, waiter)
1478 if self._debug: 1478 ↛ 1479line 1478 didn't jump to line 1479 because the condition on line 1478 was never true
1479 if local_addr:
1480 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1481 "created: (%r, %r)",
1482 local_addr, remote_addr, transport, protocol)
1483 else:
1484 logger.debug("Datagram endpoint remote_addr=%r created: "
1485 "(%r, %r)",
1486 remote_addr, transport, protocol)
1488 try:
1489 await waiter
1490 except:
1491 transport.close()
1492 raise
1494 return transport, protocol
1496 async def _ensure_resolved(self, address, *,
1497 family=0, type=socket.SOCK_STREAM,
1498 proto=0, flags=0, loop):
1499 host, port = address[:2]
1500 info = _ipaddr_info(host, port, family, type, proto, *address[2:])
1501 if info is not None:
1502 # "host" is already a resolved IP.
1503 return [info]
1504 else:
1505 return await loop.getaddrinfo(host, port, family=family, type=type,
1506 proto=proto, flags=flags)
1508 async def _create_server_getaddrinfo(self, host, port, family, flags):
1509 infos = await self._ensure_resolved((host, port), family=family,
1510 type=socket.SOCK_STREAM,
1511 flags=flags, loop=self)
1512 if not infos:
1513 raise OSError(f'getaddrinfo({host!r}) returned empty list')
1514 return infos
1516 async def create_server(
1517 self, protocol_factory, host=None, port=None,
1518 *,
1519 family=socket.AF_UNSPEC,
1520 flags=socket.AI_PASSIVE,
1521 sock=None,
1522 backlog=100,
1523 ssl=None,
1524 reuse_address=None,
1525 reuse_port=None,
1526 keep_alive=None,
1527 ssl_handshake_timeout=None,
1528 ssl_shutdown_timeout=None,
1529 start_serving=True):
1530 """Create a TCP server.
1532 The host parameter can be a string, in that case the TCP server is
1533 bound to host and port.
1535 The host parameter can also be a sequence of strings and in that case
1536 the TCP server is bound to all hosts of the sequence. If a host
1537 appears multiple times (possibly indirectly e.g. when hostnames
1538 resolve to the same IP address), the server is only bound once to that
1539 host.
1541 Return a Server object which can be used to stop the service.
1543 This method is a coroutine.
1544 """
1545 if isinstance(ssl, bool): 1545 ↛ 1546line 1545 didn't jump to line 1546 because the condition on line 1545 was never true
1546 raise TypeError('ssl argument must be an SSLContext or None')
1548 if ssl_handshake_timeout is not None and ssl is None:
1549 raise ValueError(
1550 'ssl_handshake_timeout is only meaningful with ssl')
1552 if ssl_shutdown_timeout is not None and ssl is None: 1552 ↛ 1553line 1552 didn't jump to line 1553 because the condition on line 1552 was never true
1553 raise ValueError(
1554 'ssl_shutdown_timeout is only meaningful with ssl')
1556 if sock is not None:
1557 _check_ssl_socket(sock)
1559 if host is not None or port is not None:
1560 if sock is not None:
1561 raise ValueError(
1562 'host/port and sock can not be specified at the same time')
1564 if reuse_address is None: 1564 ↛ 1566line 1564 didn't jump to line 1566 because the condition on line 1564 was always true
1565 reuse_address = os.name == "posix" and sys.platform != "cygwin"
1566 sockets = []
1567 if host == '':
1568 hosts = [None]
1569 elif (isinstance(host, str) or
1570 not isinstance(host, collections.abc.Iterable)):
1571 hosts = [host]
1572 else:
1573 hosts = host
1575 fs = [self._create_server_getaddrinfo(host, port, family=family,
1576 flags=flags)
1577 for host in hosts]
1578 infos = await tasks.gather(*fs)
1579 infos = set(itertools.chain.from_iterable(infos))
1581 completed = False
1582 try:
1583 for res in infos:
1584 af, socktype, proto, canonname, sa = res
1585 try:
1586 sock = socket.socket(af, socktype, proto)
1587 except socket.error:
1588 # Assume it's a bad family/type/protocol combination.
1589 if self._debug:
1590 logger.warning('create_server() failed to create '
1591 'socket.socket(%r, %r, %r)',
1592 af, socktype, proto, exc_info=True)
1593 continue
1594 sockets.append(sock)
1595 if reuse_address: 1595 ↛ 1600line 1595 didn't jump to line 1600 because the condition on line 1595 was always true
1596 sock.setsockopt(
1597 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1598 # Since Linux 6.12.9, SO_REUSEPORT is not allowed
1599 # on other address families than AF_INET/AF_INET6.
1600 if reuse_port and af in (socket.AF_INET, socket.AF_INET6):
1601 _set_reuseport(sock)
1602 if keep_alive: 1602 ↛ 1603line 1602 didn't jump to line 1603 because the condition on line 1602 was never true
1603 sock.setsockopt(
1604 socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)
1605 # Disable IPv4/IPv6 dual stack support (enabled by
1606 # default on Linux) which makes a single socket
1607 # listen on both address families.
1608 if (_HAS_IPv6 and
1609 af == socket.AF_INET6 and
1610 hasattr(socket, 'IPPROTO_IPV6')):
1611 sock.setsockopt(socket.IPPROTO_IPV6,
1612 socket.IPV6_V6ONLY,
1613 True)
1614 try:
1615 sock.bind(sa)
1616 except OSError as err:
1617 msg = ('error while attempting '
1618 'to bind on address %r: %s'
1619 % (sa, str(err).lower()))
1620 if err.errno == errno.EADDRNOTAVAIL: 1620 ↛ 1622line 1620 didn't jump to line 1622 because the condition on line 1620 was never true
1621 # Assume the family is not enabled (bpo-30945)
1622 sockets.pop()
1623 sock.close()
1624 if self._debug:
1625 logger.warning(msg)
1626 continue
1627 raise OSError(err.errno, msg) from None
1629 if not sockets: 1629 ↛ 1630line 1629 didn't jump to line 1630 because the condition on line 1629 was never true
1630 raise OSError('could not bind on any address out of %r'
1631 % ([info[4] for info in infos],))
1633 completed = True
1634 finally:
1635 if not completed:
1636 for sock in sockets: 1636 ↛ 1645line 1636 didn't jump to line 1645 because the loop on line 1636 didn't complete
1637 sock.close()
1638 else:
1639 if sock is None:
1640 raise ValueError('Neither host/port nor sock were specified')
1641 if sock.type != socket.SOCK_STREAM:
1642 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1643 sockets = [sock]
1645 for sock in sockets:
1646 sock.setblocking(False)
1648 server = Server(self, sockets, protocol_factory,
1649 ssl, backlog, ssl_handshake_timeout,
1650 ssl_shutdown_timeout)
1651 if start_serving:
1652 server._start_serving()
1653 # Skip one loop iteration so that all 'loop.add_reader'
1654 # go through.
1655 await tasks.sleep(0)
1657 if self._debug:
1658 logger.info("%r is serving", server)
1659 return server
1661 async def connect_accepted_socket(
1662 self, protocol_factory, sock,
1663 *, ssl=None,
1664 ssl_handshake_timeout=None,
1665 ssl_shutdown_timeout=None):
1666 if sock.type != socket.SOCK_STREAM: 1666 ↛ 1667line 1666 didn't jump to line 1667 because the condition on line 1666 was never true
1667 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1669 if ssl_handshake_timeout is not None and not ssl:
1670 raise ValueError(
1671 'ssl_handshake_timeout is only meaningful with ssl')
1673 if ssl_shutdown_timeout is not None and not ssl: 1673 ↛ 1674line 1673 didn't jump to line 1674 because the condition on line 1673 was never true
1674 raise ValueError(
1675 'ssl_shutdown_timeout is only meaningful with ssl')
1677 _check_ssl_socket(sock)
1679 transport, protocol = await self._create_connection_transport(
1680 sock, protocol_factory, ssl, '', server_side=True,
1681 ssl_handshake_timeout=ssl_handshake_timeout,
1682 ssl_shutdown_timeout=ssl_shutdown_timeout)
1683 if self._debug: 1683 ↛ 1686line 1683 didn't jump to line 1686 because the condition on line 1683 was never true
1684 # Get the socket from the transport because SSL transport closes
1685 # the old socket and creates a new SSL socket
1686 sock = transport.get_extra_info('socket')
1687 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1688 return transport, protocol
1690 async def connect_read_pipe(self, protocol_factory, pipe):
1691 protocol = protocol_factory()
1692 waiter = self.create_future()
1693 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
1695 try:
1696 await waiter
1697 except:
1698 transport.close()
1699 raise
1701 if self._debug: 1701 ↛ 1702line 1701 didn't jump to line 1702 because the condition on line 1701 was never true
1702 logger.debug('Read pipe %r connected: (%r, %r)',
1703 pipe.fileno(), transport, protocol)
1704 return transport, protocol
1706 async def connect_write_pipe(self, protocol_factory, pipe):
1707 protocol = protocol_factory()
1708 waiter = self.create_future()
1709 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
1711 try:
1712 await waiter
1713 except:
1714 transport.close()
1715 raise
1717 if self._debug: 1717 ↛ 1718line 1717 didn't jump to line 1718 because the condition on line 1717 was never true
1718 logger.debug('Write pipe %r connected: (%r, %r)',
1719 pipe.fileno(), transport, protocol)
1720 return transport, protocol
1722 def _log_subprocess(self, msg, stdin, stdout, stderr):
1723 info = [msg]
1724 if stdin is not None:
1725 info.append(f'stdin={_format_pipe(stdin)}')
1726 if stdout is not None and stderr == subprocess.STDOUT:
1727 info.append(f'stdout=stderr={_format_pipe(stdout)}')
1728 else:
1729 if stdout is not None:
1730 info.append(f'stdout={_format_pipe(stdout)}')
1731 if stderr is not None:
1732 info.append(f'stderr={_format_pipe(stderr)}')
1733 logger.debug(' '.join(info))
1735 async def subprocess_shell(self, protocol_factory, cmd, *,
1736 stdin=subprocess.PIPE,
1737 stdout=subprocess.PIPE,
1738 stderr=subprocess.PIPE,
1739 universal_newlines=False,
1740 shell=True, bufsize=0,
1741 encoding=None, errors=None, text=None,
1742 **kwargs):
1743 if not isinstance(cmd, (bytes, str)):
1744 raise ValueError("cmd must be a string")
1745 if universal_newlines:
1746 raise ValueError("universal_newlines must be False")
1747 if not shell:
1748 raise ValueError("shell must be True")
1749 if bufsize != 0:
1750 raise ValueError("bufsize must be 0")
1751 if text:
1752 raise ValueError("text must be False")
1753 if encoding is not None:
1754 raise ValueError("encoding must be None")
1755 if errors is not None:
1756 raise ValueError("errors must be None")
1758 protocol = protocol_factory()
1759 debug_log = None
1760 if self._debug: 1760 ↛ 1763line 1760 didn't jump to line 1763 because the condition on line 1760 was never true
1761 # don't log parameters: they may contain sensitive information
1762 # (password) and may be too long
1763 debug_log = 'run shell command %r' % cmd
1764 self._log_subprocess(debug_log, stdin, stdout, stderr)
1765 transport = await self._make_subprocess_transport(
1766 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
1767 if self._debug and debug_log is not None: 1767 ↛ 1768line 1767 didn't jump to line 1768 because the condition on line 1767 was never true
1768 logger.info('%s: %r', debug_log, transport)
1769 return transport, protocol
1771 async def subprocess_exec(self, protocol_factory, program, *args,
1772 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1773 stderr=subprocess.PIPE, universal_newlines=False,
1774 shell=False, bufsize=0,
1775 encoding=None, errors=None, text=None,
1776 **kwargs):
1777 if universal_newlines:
1778 raise ValueError("universal_newlines must be False")
1779 if shell:
1780 raise ValueError("shell must be False")
1781 if bufsize != 0:
1782 raise ValueError("bufsize must be 0")
1783 if text:
1784 raise ValueError("text must be False")
1785 if encoding is not None:
1786 raise ValueError("encoding must be None")
1787 if errors is not None:
1788 raise ValueError("errors must be None")
1790 popen_args = (program,) + args
1791 protocol = protocol_factory()
1792 debug_log = None
1793 if self._debug: 1793 ↛ 1796line 1793 didn't jump to line 1796 because the condition on line 1793 was never true
1794 # don't log parameters: they may contain sensitive information
1795 # (password) and may be too long
1796 debug_log = f'execute program {program!r}'
1797 self._log_subprocess(debug_log, stdin, stdout, stderr)
1798 transport = await self._make_subprocess_transport(
1799 protocol, popen_args, False, stdin, stdout, stderr,
1800 bufsize, **kwargs)
1801 if self._debug and debug_log is not None: 1801 ↛ 1802line 1801 didn't jump to line 1802 because the condition on line 1801 was never true
1802 logger.info('%s: %r', debug_log, transport)
1803 return transport, protocol
1805 def get_exception_handler(self):
1806 """Return an exception handler, or None if the default one is in use.
1807 """
1808 return self._exception_handler
1810 def set_exception_handler(self, handler):
1811 """Set handler as the new event loop exception handler.
1813 If handler is None, the default exception handler will
1814 be set.
1816 If handler is a callable object, it should have a
1817 signature matching '(loop, context)', where 'loop'
1818 will be a reference to the active event loop, 'context'
1819 will be a dict object (see `call_exception_handler()`
1820 documentation for details about context).
1821 """
1822 if handler is not None and not callable(handler):
1823 raise TypeError(f'A callable object or None is expected, '
1824 f'got {handler!r}')
1825 self._exception_handler = handler
1827 def default_exception_handler(self, context):
1828 """Default exception handler.
1830 This is called when an exception occurs and no exception
1831 handler is set, and can be called by a custom exception
1832 handler that wants to defer to the default behavior.
1834 This default handler logs the error message and other
1835 context-dependent information. In debug mode, a truncated
1836 stack trace is also appended showing where the given object
1837 (e.g. a handle or future or task) was created, if any.
1839 The context parameter has the same meaning as in
1840 `call_exception_handler()`.
1841 """
1842 message = context.get('message')
1843 if not message: 1843 ↛ 1844line 1843 didn't jump to line 1844 because the condition on line 1843 was never true
1844 message = 'Unhandled exception in event loop'
1846 exception = context.get('exception')
1847 if exception is not None:
1848 exc_info = (type(exception), exception, exception.__traceback__)
1849 else:
1850 exc_info = False
1852 if ('source_traceback' not in context and 1852 ↛ 1855line 1852 didn't jump to line 1855 because the condition on line 1852 was never true
1853 self._current_handle is not None and
1854 self._current_handle._source_traceback):
1855 context['handle_traceback'] = \
1856 self._current_handle._source_traceback
1858 log_lines = [message]
1859 for key in sorted(context):
1860 if key in {'message', 'exception'}:
1861 continue
1862 value = context[key]
1863 if key == 'source_traceback':
1864 tb = ''.join(traceback.format_list(value))
1865 value = 'Object created at (most recent call last):\n'
1866 value += tb.rstrip()
1867 elif key == 'handle_traceback': 1867 ↛ 1868line 1867 didn't jump to line 1868 because the condition on line 1867 was never true
1868 tb = ''.join(traceback.format_list(value))
1869 value = 'Handle created at (most recent call last):\n'
1870 value += tb.rstrip()
1871 else:
1872 value = repr(value)
1873 log_lines.append(f'{key}: {value}')
1875 logger.error('\n'.join(log_lines), exc_info=exc_info)
1877 def call_exception_handler(self, context):
1878 """Call the current event loop's exception handler.
1880 The context argument is a dict containing the following keys:
1882 - 'message': Error message;
1883 - 'exception' (optional): Exception object;
1884 - 'future' (optional): Future instance;
1885 - 'task' (optional): Task instance;
1886 - 'handle' (optional): Handle instance;
1887 - 'protocol' (optional): Protocol instance;
1888 - 'transport' (optional): Transport instance;
1889 - 'socket' (optional): Socket instance;
1890 - 'source_traceback' (optional): Traceback of the source;
1891 - 'handle_traceback' (optional): Traceback of the handle;
1892 - 'asyncgen' (optional): Asynchronous generator that caused
1893 the exception.
1895 New keys maybe introduced in the future.
1897 Note: do not overload this method in an event loop subclass.
1898 For custom exception handling, use the
1899 `set_exception_handler()` method.
1900 """
1901 if self._exception_handler is None:
1902 try:
1903 self.default_exception_handler(context)
1904 except (SystemExit, KeyboardInterrupt):
1905 raise
1906 except BaseException:
1907 # Second protection layer for unexpected errors
1908 # in the default implementation, as well as for subclassed
1909 # event loops with overloaded "default_exception_handler".
1910 logger.error('Exception in default exception handler',
1911 exc_info=True)
1912 else:
1913 try:
1914 ctx = None
1915 thing = context.get("task")
1916 if thing is None:
1917 # Even though Futures don't have a context,
1918 # Task is a subclass of Future,
1919 # and sometimes the 'future' key holds a Task.
1920 thing = context.get("future")
1921 if thing is None:
1922 # Handles also have a context.
1923 thing = context.get("handle")
1924 if thing is not None and hasattr(thing, "get_context"):
1925 ctx = thing.get_context()
1926 if ctx is not None and hasattr(ctx, "run"):
1927 ctx.run(self._exception_handler, self, context)
1928 else:
1929 self._exception_handler(self, context)
1930 except (SystemExit, KeyboardInterrupt):
1931 raise
1932 except BaseException as exc:
1933 # Exception in the user set custom exception handler.
1934 try:
1935 # Let's try default handler.
1936 self.default_exception_handler({
1937 'message': 'Unhandled error in exception handler',
1938 'exception': exc,
1939 'context': context,
1940 })
1941 except (SystemExit, KeyboardInterrupt):
1942 raise
1943 except BaseException:
1944 # Guard 'default_exception_handler' in case it is
1945 # overloaded.
1946 logger.error('Exception in default exception handler '
1947 'while handling an unexpected error '
1948 'in custom exception handler',
1949 exc_info=True)
1951 def _add_callback(self, handle):
1952 """Add a Handle to _ready."""
1953 if not handle._cancelled:
1954 self._ready.append(handle)
1956 def _add_callback_signalsafe(self, handle):
1957 """Like _add_callback() but called from a signal handler."""
1958 self._add_callback(handle)
1959 self._write_to_self()
1961 def _timer_handle_cancelled(self, handle):
1962 """Notification that a TimerHandle has been cancelled."""
1963 if handle._scheduled:
1964 self._timer_cancelled_count += 1
1966 def _run_once(self):
1967 """Run one full iteration of the event loop.
1969 This calls all currently ready callbacks, polls for I/O,
1970 schedules the resulting callbacks, and finally schedules
1971 'call_later' callbacks.
1972 """
1974 sched_count = len(self._scheduled)
1975 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1976 self._timer_cancelled_count / sched_count >
1977 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1978 # Remove delayed calls that were cancelled if their number
1979 # is too high
1980 new_scheduled = []
1981 for handle in self._scheduled:
1982 if handle._cancelled:
1983 handle._scheduled = False
1984 else:
1985 new_scheduled.append(handle)
1987 heapq.heapify(new_scheduled)
1988 self._scheduled = new_scheduled
1989 self._timer_cancelled_count = 0
1990 else:
1991 # Remove delayed calls that were cancelled from head of queue.
1992 while self._scheduled and self._scheduled[0]._cancelled:
1993 self._timer_cancelled_count -= 1
1994 handle = heapq.heappop(self._scheduled)
1995 handle._scheduled = False
1997 timeout = None
1998 if self._ready or self._stopping:
1999 timeout = 0
2000 elif self._scheduled:
2001 # Compute the desired timeout.
2002 timeout = self._scheduled[0]._when - self.time()
2003 if timeout > MAXIMUM_SELECT_TIMEOUT: 2003 ↛ 2004line 2003 didn't jump to line 2004 because the condition on line 2003 was never true
2004 timeout = MAXIMUM_SELECT_TIMEOUT
2005 elif timeout < 0:
2006 timeout = 0
2008 event_list = self._selector.select(timeout)
2009 self._process_events(event_list)
2010 # Needed to break cycles when an exception occurs.
2011 event_list = None
2013 # Handle 'later' callbacks that are ready.
2014 end_time = self.time() + self._clock_resolution
2015 while self._scheduled:
2016 handle = self._scheduled[0]
2017 if handle._when >= end_time:
2018 break
2019 handle = heapq.heappop(self._scheduled)
2020 handle._scheduled = False
2021 self._ready.append(handle)
2023 # This is the only place where callbacks are actually *called*.
2024 # All other places just add them to ready.
2025 # Note: We run all currently scheduled callbacks, but not any
2026 # callbacks scheduled by callbacks run this time around --
2027 # they will be run the next time (after another I/O poll).
2028 # Use an idiom that is thread-safe without using locks.
2029 ntodo = len(self._ready)
2030 for i in range(ntodo):
2031 handle = self._ready.popleft()
2032 if handle._cancelled:
2033 continue
2034 if self._debug:
2035 try:
2036 self._current_handle = handle
2037 t0 = self.time()
2038 handle._run()
2039 dt = self.time() - t0
2040 if dt >= self.slow_callback_duration:
2041 logger.warning('Executing %s took %.3f seconds',
2042 _format_handle(handle), dt)
2043 finally:
2044 self._current_handle = None
2045 else:
2046 handle._run()
2047 handle = None # Needed to break cycles when an exception occurs.
2049 def _set_coroutine_origin_tracking(self, enabled):
2050 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
2051 return
2053 if enabled:
2054 self._coroutine_origin_tracking_saved_depth = (
2055 sys.get_coroutine_origin_tracking_depth())
2056 sys.set_coroutine_origin_tracking_depth(
2057 constants.DEBUG_STACK_DEPTH)
2058 else:
2059 sys.set_coroutine_origin_tracking_depth(
2060 self._coroutine_origin_tracking_saved_depth)
2062 self._coroutine_origin_tracking_enabled = enabled
2064 def get_debug(self):
2065 return self._debug
2067 def set_debug(self, enabled):
2068 self._debug = enabled
2070 if self.is_running():
2071 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)