Coverage for Lib/asyncio/base_events.py: 88%
1148 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
1"""Base implementation of event loop.
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of I/O events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
16import collections
17import collections.abc
18import concurrent.futures
19import errno
20import heapq
21import itertools
22import os
23import socket
24import stat
25import subprocess
26import threading
27import time
28import traceback
29import sys
30import warnings
31import weakref
33try:
34 import ssl
35except ImportError: # pragma: no cover
36 ssl = None
38from . import constants
39from . import coroutines
40from . import events
41from . import exceptions
42from . import futures
43from . import protocols
44from . import sslproto
45from . import staggered
46from . import tasks
47from . import timeouts
48from . import transports
49from . import trsock
50from .log import logger
53__all__ = 'BaseEventLoop','Server',
56# Minimum number of _scheduled timer handles before cleanup of
57# cancelled handles is performed.
58_MIN_SCHEDULED_TIMER_HANDLES = 100
60# Minimum fraction of _scheduled timer handles that are cancelled
61# before cleanup of cancelled handles is performed.
62_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
65_HAS_IPv6 = hasattr(socket, 'AF_INET6')
67# Maximum timeout passed to select to avoid OS limitations
68MAXIMUM_SELECT_TIMEOUT = 24 * 3600
71def _format_handle(handle):
72 cb = handle._callback
73 if isinstance(getattr(cb, '__self__', None), tasks.Task):
74 # format the task
75 return repr(cb.__self__)
76 else:
77 return str(handle)
80def _format_pipe(fd):
81 if fd == subprocess.PIPE:
82 return '<pipe>'
83 elif fd == subprocess.STDOUT:
84 return '<stdout>'
85 else:
86 return repr(fd)
89def _set_reuseport(sock):
90 if not hasattr(socket, 'SO_REUSEPORT'):
91 raise ValueError('reuse_port not supported by socket module')
92 else:
93 try:
94 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
95 except OSError:
96 raise ValueError('reuse_port not supported by socket module, '
97 'SO_REUSEPORT defined but not implemented.')
100def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
101 # Try to skip getaddrinfo if "host" is already an IP. Users might have
102 # handled name resolution in their own code and pass in resolved IPs.
103 if not hasattr(socket, 'inet_pton'):
104 return
106 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
107 host is None:
108 return None
110 if type == socket.SOCK_STREAM:
111 proto = socket.IPPROTO_TCP
112 elif type == socket.SOCK_DGRAM:
113 proto = socket.IPPROTO_UDP
114 else:
115 return None
117 if port is None:
118 port = 0
119 elif isinstance(port, bytes) and port == b'':
120 port = 0
121 elif isinstance(port, str) and port == '':
122 port = 0
123 else:
124 # If port's a service name like "http", don't skip getaddrinfo.
125 try:
126 port = int(port)
127 except (TypeError, ValueError):
128 return None
130 if family == socket.AF_UNSPEC:
131 afs = [socket.AF_INET]
132 if _HAS_IPv6: 132 ↛ 137line 132 didn't jump to line 137 because the condition on line 132 was always true
133 afs.append(socket.AF_INET6)
134 else:
135 afs = [family]
137 if isinstance(host, bytes):
138 host = host.decode('idna')
139 if '%' in host:
140 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
141 # like '::1%lo0'.
142 return None
144 for af in afs:
145 try:
146 socket.inet_pton(af, host)
147 # The host has already been resolved.
148 if _HAS_IPv6 and af == socket.AF_INET6:
149 return af, type, proto, '', (host, port, flowinfo, scopeid)
150 else:
151 return af, type, proto, '', (host, port)
152 except OSError:
153 pass
155 # "host" is not an IP address.
156 return None
159def _interleave_addrinfos(addrinfos, first_address_family_count=1):
160 """Interleave list of addrinfo tuples by family."""
161 # Group addresses by family
162 addrinfos_by_family = collections.OrderedDict()
163 for addr in addrinfos:
164 family = addr[0]
165 if family not in addrinfos_by_family: 165 ↛ 167line 165 didn't jump to line 167 because the condition on line 165 was always true
166 addrinfos_by_family[family] = []
167 addrinfos_by_family[family].append(addr)
168 addrinfos_lists = list(addrinfos_by_family.values())
170 reordered = []
171 if first_address_family_count > 1: 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true
172 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
173 del addrinfos_lists[0][:first_address_family_count - 1]
174 reordered.extend(
175 a for a in itertools.chain.from_iterable(
176 itertools.zip_longest(*addrinfos_lists)
177 ) if a is not None)
178 return reordered
181def _run_until_complete_cb(fut):
182 if not fut.cancelled():
183 exc = fut.exception()
184 if isinstance(exc, (SystemExit, KeyboardInterrupt)):
185 # Issue #22429: run_forever() already finished, no need to
186 # stop it.
187 return
188 futures._get_loop(fut).stop()
191if hasattr(socket, 'TCP_NODELAY'): 191 ↛ 198line 191 didn't jump to line 198 because the condition on line 191 was always true
192 def _set_nodelay(sock):
193 if (sock.family in {socket.AF_INET, socket.AF_INET6} and
194 sock.type == socket.SOCK_STREAM and
195 sock.proto == socket.IPPROTO_TCP):
196 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
197else:
198 def _set_nodelay(sock):
199 pass
202def _check_ssl_socket(sock):
203 if ssl is not None and isinstance(sock, ssl.SSLSocket): 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true
204 raise TypeError("Socket cannot be of type SSLSocket")
207class _SendfileFallbackProtocol(protocols.Protocol):
208 def __init__(self, transp):
209 if not isinstance(transp, transports._FlowControlMixin): 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true
210 raise TypeError("transport should be _FlowControlMixin instance")
211 self._transport = transp
212 self._proto = transp.get_protocol()
213 self._should_resume_reading = transp.is_reading()
214 self._should_resume_writing = transp._protocol_paused
215 transp.pause_reading()
216 transp.set_protocol(self)
217 if self._should_resume_writing: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true
218 self._write_ready_fut = self._transport._loop.create_future()
219 else:
220 self._write_ready_fut = None
222 async def drain(self):
223 if self._transport.is_closing(): 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true
224 raise ConnectionError("Connection closed by peer")
225 fut = self._write_ready_fut
226 if fut is None:
227 return
228 await fut
230 def connection_made(self, transport):
231 raise RuntimeError("Invalid state: "
232 "connection should have been established already.")
234 def connection_lost(self, exc):
235 if self._write_ready_fut is not None: 235 ↛ 243line 235 didn't jump to line 243 because the condition on line 235 was always true
236 # Never happens if peer disconnects after sending the whole content
237 # Thus disconnection is always an exception from user perspective
238 if exc is None: 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true
239 self._write_ready_fut.set_exception(
240 ConnectionError("Connection is closed by peer"))
241 else:
242 self._write_ready_fut.set_exception(exc)
243 self._proto.connection_lost(exc)
245 def pause_writing(self):
246 if self._write_ready_fut is not None: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true
247 return
248 self._write_ready_fut = self._transport._loop.create_future()
250 def resume_writing(self):
251 if self._write_ready_fut is None: 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true
252 return
253 self._write_ready_fut.set_result(False)
254 self._write_ready_fut = None
256 def data_received(self, data):
257 raise RuntimeError("Invalid state: reading should be paused")
259 def eof_received(self):
260 raise RuntimeError("Invalid state: reading should be paused")
262 async def restore(self):
263 self._transport.set_protocol(self._proto)
264 if self._should_resume_reading: 264 ↛ 266line 264 didn't jump to line 266 because the condition on line 264 was always true
265 self._transport.resume_reading()
266 if self._write_ready_fut is not None:
267 # Cancel the future.
268 # Basically it has no effect because protocol is switched back,
269 # no code should wait for it anymore.
270 self._write_ready_fut.cancel()
271 if self._should_resume_writing: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true
272 self._proto.resume_writing()
275class Server(events.AbstractServer):
277 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
278 ssl_handshake_timeout, ssl_shutdown_timeout=None):
279 self._loop = loop
280 self._sockets = sockets
281 # Weak references so we don't break Transport's ability to
282 # detect abandoned transports
283 self._clients = weakref.WeakSet()
284 self._waiters = []
285 self._protocol_factory = protocol_factory
286 self._backlog = backlog
287 self._ssl_context = ssl_context
288 self._ssl_handshake_timeout = ssl_handshake_timeout
289 self._ssl_shutdown_timeout = ssl_shutdown_timeout
290 self._serving = False
291 self._serving_forever_fut = None
293 def __repr__(self):
294 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
296 def _attach(self, transport):
297 assert self._sockets is not None
298 self._clients.add(transport)
300 def _detach(self, transport):
301 self._clients.discard(transport)
302 if len(self._clients) == 0 and self._sockets is None:
303 self._wakeup()
305 def _wakeup(self):
306 waiters = self._waiters
307 self._waiters = None
308 for waiter in waiters:
309 if not waiter.done(): 309 ↛ 308line 309 didn't jump to line 308 because the condition on line 309 was always true
310 waiter.set_result(None)
312 def _start_serving(self):
313 if self._serving:
314 return
315 self._serving = True
316 for sock in self._sockets:
317 sock.listen(self._backlog)
318 self._loop._start_serving(
319 self._protocol_factory, sock, self._ssl_context,
320 self, self._backlog, self._ssl_handshake_timeout,
321 self._ssl_shutdown_timeout)
323 def get_loop(self):
324 return self._loop
326 def is_serving(self):
327 return self._serving
329 @property
330 def sockets(self):
331 if self._sockets is None:
332 return ()
333 return tuple(trsock.TransportSocket(s) for s in self._sockets)
335 def close(self):
336 sockets = self._sockets
337 if sockets is None:
338 return
339 self._sockets = None
341 for sock in sockets:
342 self._loop._stop_serving(sock)
344 self._serving = False
346 if (self._serving_forever_fut is not None and 346 ↛ 348line 346 didn't jump to line 348 because the condition on line 346 was never true
347 not self._serving_forever_fut.done()):
348 self._serving_forever_fut.cancel()
349 self._serving_forever_fut = None
351 if len(self._clients) == 0:
352 self._wakeup()
354 def close_clients(self):
355 for transport in self._clients.copy():
356 transport.close()
358 def abort_clients(self):
359 for transport in self._clients.copy():
360 transport.abort()
362 async def start_serving(self):
363 self._start_serving()
364 # Skip one loop iteration so that all 'loop.add_reader'
365 # go through.
366 await tasks.sleep(0)
368 async def serve_forever(self):
369 if self._serving_forever_fut is not None: 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true
370 raise RuntimeError(
371 f'server {self!r} is already being awaited on serve_forever()')
372 if self._sockets is None:
373 raise RuntimeError(f'server {self!r} is closed')
375 self._start_serving()
376 self._serving_forever_fut = self._loop.create_future()
378 try:
379 await self._serving_forever_fut
380 except exceptions.CancelledError:
381 try:
382 self.close()
383 await self.wait_closed()
384 finally:
385 raise
386 finally:
387 self._serving_forever_fut = None
389 async def wait_closed(self):
390 """Wait until server is closed and all connections are dropped.
392 - If the server is not closed, wait.
393 - If it is closed, but there are still active connections, wait.
395 Anyone waiting here will be unblocked once both conditions
396 (server is closed and all connections have been dropped)
397 have become true, in either order.
399 Historical note: In 3.11 and before, this was broken, returning
400 immediately if the server was already closed, even if there
401 were still active connections. An attempted fix in 3.12.0 was
402 still broken, returning immediately if the server was still
403 open and there were no active connections. Hopefully in 3.12.1
404 we have it right.
405 """
406 # Waiters are unblocked by self._wakeup(), which is called
407 # from two places: self.close() and self._detach(), but only
408 # when both conditions have become true. To signal that this
409 # has happened, self._wakeup() sets self._waiters to None.
410 if self._waiters is None:
411 return
412 waiter = self._loop.create_future()
413 self._waiters.append(waiter)
414 await waiter
417class BaseEventLoop(events.AbstractEventLoop):
419 def __init__(self):
420 self._timer_cancelled_count = 0
421 self._closed = False
422 self._stopping = False
423 self._ready = collections.deque()
424 self._scheduled = []
425 self._default_executor = None
426 self._internal_fds = 0
427 # Identifier of the thread running the event loop, or None if the
428 # event loop is not running
429 self._thread_id = None
430 self._clock_resolution = time.get_clock_info('monotonic').resolution
431 self._exception_handler = None
432 self.set_debug(coroutines._is_debug_mode())
433 # The preserved state of async generator hooks.
434 self._old_agen_hooks = None
435 # In debug mode, if the execution of a callback or a step of a task
436 # exceed this duration in seconds, the slow callback/task is logged.
437 self.slow_callback_duration = 0.1
438 self._current_handle = None
439 self._task_factory = None
440 self._coroutine_origin_tracking_enabled = False
441 self._coroutine_origin_tracking_saved_depth = None
443 # A weak set of all asynchronous generators that are
444 # being iterated by the loop.
445 self._asyncgens = weakref.WeakSet()
446 # Set to True when `loop.shutdown_asyncgens` is called.
447 self._asyncgens_shutdown_called = False
448 # Set to True when `loop.shutdown_default_executor` is called.
449 self._executor_shutdown_called = False
451 def __repr__(self):
452 return (
453 f'<{self.__class__.__name__} running={self.is_running()} '
454 f'closed={self.is_closed()} debug={self.get_debug()}>'
455 )
457 def create_future(self):
458 """Create a Future object attached to the loop."""
459 return futures.Future(loop=self)
461 def create_task(self, coro, **kwargs):
462 """Schedule a coroutine object.
464 Return a task object.
465 """
466 self._check_closed()
467 if self._task_factory is not None:
468 return self._task_factory(self, coro, **kwargs)
470 task = tasks.Task(coro, loop=self, **kwargs)
471 if task._source_traceback:
472 del task._source_traceback[-1]
473 try:
474 return task
475 finally:
476 # gh-128552: prevent a refcycle of
477 # task.exception().__traceback__->BaseEventLoop.create_task->task
478 del task
480 def set_task_factory(self, factory):
481 """Set a task factory that will be used by loop.create_task().
483 If factory is None the default task factory will be set.
485 If factory is a callable, it should have a signature matching
486 '(loop, coro, **kwargs)', where 'loop' will be a reference to the active
487 event loop, 'coro' will be a coroutine object, and **kwargs will be
488 arbitrary keyword arguments that should be passed on to Task.
489 The callable must return a Task.
490 """
491 if factory is not None and not callable(factory):
492 raise TypeError('task factory must be a callable or None')
493 self._task_factory = factory
495 def get_task_factory(self):
496 """Return a task factory, or None if the default one is in use."""
497 return self._task_factory
499 def _make_socket_transport(self, sock, protocol, waiter=None, *,
500 extra=None, server=None):
501 """Create socket transport."""
502 raise NotImplementedError
504 def _make_ssl_transport(
505 self, rawsock, protocol, sslcontext, waiter=None,
506 *, server_side=False, server_hostname=None,
507 extra=None, server=None,
508 ssl_handshake_timeout=None,
509 ssl_shutdown_timeout=None,
510 call_connection_made=True):
511 """Create SSL transport."""
512 raise NotImplementedError
514 def _make_datagram_transport(self, sock, protocol,
515 address=None, waiter=None, extra=None):
516 """Create datagram transport."""
517 raise NotImplementedError
519 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
520 extra=None):
521 """Create read pipe transport."""
522 raise NotImplementedError
524 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
525 extra=None):
526 """Create write pipe transport."""
527 raise NotImplementedError
529 async def _make_subprocess_transport(self, protocol, args, shell,
530 stdin, stdout, stderr, bufsize,
531 extra=None, **kwargs):
532 """Create subprocess transport."""
533 raise NotImplementedError
535 def _write_to_self(self):
536 """Write a byte to self-pipe, to wake up the event loop.
538 This may be called from a different thread.
540 The subclass is responsible for implementing the self-pipe.
541 """
542 raise NotImplementedError
544 def _process_events(self, event_list):
545 """Process selector events."""
546 raise NotImplementedError
548 def _check_closed(self):
549 if self._closed:
550 raise RuntimeError('Event loop is closed')
552 def _check_default_executor(self):
553 if self._executor_shutdown_called: 553 ↛ 554line 553 didn't jump to line 554 because the condition on line 553 was never true
554 raise RuntimeError('Executor shutdown has been called')
556 def _asyncgen_finalizer_hook(self, agen):
557 self._asyncgens.discard(agen)
558 if not self.is_closed(): 558 ↛ exitline 558 didn't return from function '_asyncgen_finalizer_hook' because the condition on line 558 was always true
559 self.call_soon_threadsafe(self.create_task, agen.aclose())
561 def _asyncgen_firstiter_hook(self, agen):
562 if self._asyncgens_shutdown_called: 562 ↛ 563line 562 didn't jump to line 563 because the condition on line 562 was never true
563 warnings.warn(
564 f"asynchronous generator {agen!r} was scheduled after "
565 f"loop.shutdown_asyncgens() call",
566 ResourceWarning, source=self)
568 self._asyncgens.add(agen)
570 async def shutdown_asyncgens(self):
571 """Shutdown all active asynchronous generators."""
572 self._asyncgens_shutdown_called = True
574 if not len(self._asyncgens):
575 # If Python version is <3.6 or we don't have any asynchronous
576 # generators alive.
577 return
579 closing_agens = list(self._asyncgens)
580 self._asyncgens.clear()
582 results = await tasks.gather(
583 *[ag.aclose() for ag in closing_agens],
584 return_exceptions=True)
586 for result, agen in zip(results, closing_agens):
587 if isinstance(result, Exception): 587 ↛ 588line 587 didn't jump to line 588 because the condition on line 587 was never true
588 self.call_exception_handler({
589 'message': f'an error occurred during closing of '
590 f'asynchronous generator {agen!r}',
591 'exception': result,
592 'asyncgen': agen
593 })
595 async def shutdown_default_executor(self, timeout=None):
596 """Schedule the shutdown of the default executor.
598 The timeout parameter specifies the amount of time the executor will
599 be given to finish joining. The default value is None, which means
600 that the executor will be given an unlimited amount of time.
601 """
602 self._executor_shutdown_called = True
603 if self._default_executor is None:
604 return
605 future = self.create_future()
606 thread = threading.Thread(target=self._do_shutdown, args=(future,))
607 thread.start()
608 try:
609 async with timeouts.timeout(timeout):
610 await future
611 except TimeoutError:
612 warnings.warn("The executor did not finishing joining "
613 f"its threads within {timeout} seconds.",
614 RuntimeWarning, stacklevel=2)
615 self._default_executor.shutdown(wait=False)
616 else:
617 thread.join()
619 def _do_shutdown(self, future):
620 try:
621 self._default_executor.shutdown(wait=True)
622 if not self.is_closed(): 622 ↛ exitline 622 didn't return from function '_do_shutdown' because the condition on line 622 was always true
623 self.call_soon_threadsafe(futures._set_result_unless_cancelled,
624 future, None)
625 except Exception as ex:
626 if not self.is_closed() and not future.cancelled():
627 self.call_soon_threadsafe(future.set_exception, ex)
629 def _check_running(self):
630 if self.is_running():
631 raise RuntimeError('This event loop is already running')
632 if events._get_running_loop() is not None:
633 raise RuntimeError(
634 'Cannot run the event loop while another loop is running')
636 def _run_forever_setup(self):
637 """Prepare the run loop to process events.
639 This method exists so that custom custom event loop subclasses (e.g., event loops
640 that integrate a GUI event loop with Python's event loop) have access to all the
641 loop setup logic.
642 """
643 self._check_closed()
644 self._check_running()
645 self._set_coroutine_origin_tracking(self._debug)
647 self._old_agen_hooks = sys.get_asyncgen_hooks()
648 self._thread_id = threading.get_ident()
649 sys.set_asyncgen_hooks(
650 firstiter=self._asyncgen_firstiter_hook,
651 finalizer=self._asyncgen_finalizer_hook
652 )
654 events._set_running_loop(self)
656 def _run_forever_cleanup(self):
657 """Clean up after an event loop finishes the looping over events.
659 This method exists so that custom custom event loop subclasses (e.g., event loops
660 that integrate a GUI event loop with Python's event loop) have access to all the
661 loop cleanup logic.
662 """
663 self._stopping = False
664 self._thread_id = None
665 events._set_running_loop(None)
666 self._set_coroutine_origin_tracking(False)
667 # Restore any pre-existing async generator hooks.
668 if self._old_agen_hooks is not None: 668 ↛ exitline 668 didn't return from function '_run_forever_cleanup' because the condition on line 668 was always true
669 sys.set_asyncgen_hooks(*self._old_agen_hooks)
670 self._old_agen_hooks = None
672 def run_forever(self):
673 """Run until stop() is called."""
674 self._run_forever_setup()
675 try:
676 while True:
677 self._run_once()
678 if self._stopping:
679 break
680 finally:
681 self._run_forever_cleanup()
683 def run_until_complete(self, future):
684 """Run until the Future is done.
686 If the argument is a coroutine, it is wrapped in a Task.
688 WARNING: It would be disastrous to call run_until_complete()
689 with the same coroutine twice -- it would wrap it in two
690 different Tasks and that can't be good.
692 Return the Future's result, or raise its exception.
693 """
694 self._check_closed()
695 self._check_running()
697 new_task = not futures.isfuture(future)
698 future = tasks.ensure_future(future, loop=self)
699 if new_task:
700 # An exception is raised if the future didn't complete, so there
701 # is no need to log the "destroy pending task" message
702 future._log_destroy_pending = False
704 future.add_done_callback(_run_until_complete_cb)
705 try:
706 self.run_forever()
707 except:
708 if new_task and future.done() and not future.cancelled():
709 # The coroutine raised a BaseException. Consume the exception
710 # to not log a warning, the caller doesn't have access to the
711 # local task.
712 future.exception()
713 raise
714 finally:
715 future.remove_done_callback(_run_until_complete_cb)
716 if not future.done():
717 raise RuntimeError('Event loop stopped before Future completed.')
719 return future.result()
721 def stop(self):
722 """Stop running the event loop.
724 Every callback already scheduled will still run. This simply informs
725 run_forever to stop looping after a complete iteration.
726 """
727 self._stopping = True
729 def close(self):
730 """Close the event loop.
732 This clears the queues and shuts down the executor,
733 but does not wait for the executor to finish.
735 The event loop must not be running.
736 """
737 if self.is_running(): 737 ↛ 738line 737 didn't jump to line 738 because the condition on line 737 was never true
738 raise RuntimeError("Cannot close a running event loop")
739 if self._closed:
740 return
741 if self._debug:
742 logger.debug("Close %r", self)
743 self._closed = True
744 self._ready.clear()
745 self._scheduled.clear()
746 self._executor_shutdown_called = True
747 executor = self._default_executor
748 if executor is not None:
749 self._default_executor = None
750 executor.shutdown(wait=False)
752 def is_closed(self):
753 """Returns True if the event loop was closed."""
754 return self._closed
756 def __del__(self, _warn=warnings.warn):
757 if not self.is_closed(): 757 ↛ 758line 757 didn't jump to line 758 because the condition on line 757 was never true
758 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
759 if not self.is_running():
760 self.close()
762 def is_running(self):
763 """Returns True if the event loop is running."""
764 return (self._thread_id is not None)
766 def time(self):
767 """Return the time according to the event loop's clock.
769 This is a float expressed in seconds since an epoch, but the
770 epoch, precision, accuracy and drift are unspecified and may
771 differ per event loop.
772 """
773 return time.monotonic()
775 def call_later(self, delay, callback, *args, context=None):
776 """Arrange for a callback to be called at a given time.
778 Return a Handle: an opaque object with a cancel() method that
779 can be used to cancel the call.
781 The delay can be an int or float, expressed in seconds. It is
782 always relative to the current time.
784 Each callback will be called exactly once. If two callbacks
785 are scheduled for exactly the same time, it is undefined which
786 will be called first.
788 Any positional arguments after the callback will be passed to
789 the callback when it is called.
790 """
791 if delay is None:
792 raise TypeError('delay must not be None')
793 timer = self.call_at(self.time() + delay, callback, *args,
794 context=context)
795 if timer._source_traceback:
796 del timer._source_traceback[-1]
797 return timer
799 def call_at(self, when, callback, *args, context=None):
800 """Like call_later(), but uses an absolute time.
802 Absolute time corresponds to the event loop's time() method.
803 """
804 if when is None:
805 raise TypeError("when cannot be None")
806 self._check_closed()
807 if self._debug:
808 self._check_thread()
809 self._check_callback(callback, 'call_at')
810 timer = events.TimerHandle(when, callback, args, self, context)
811 if timer._source_traceback:
812 del timer._source_traceback[-1]
813 heapq.heappush(self._scheduled, timer)
814 timer._scheduled = True
815 return timer
817 def call_soon(self, callback, *args, context=None):
818 """Arrange for a callback to be called as soon as possible.
820 This operates as a FIFO queue: callbacks are called in the
821 order in which they are registered. Each callback will be
822 called exactly once.
824 Any positional arguments after the callback will be passed to
825 the callback when it is called.
826 """
827 self._check_closed()
828 if self._debug:
829 self._check_thread()
830 self._check_callback(callback, 'call_soon')
831 handle = self._call_soon(callback, args, context)
832 if handle._source_traceback:
833 del handle._source_traceback[-1]
834 return handle
836 def _check_callback(self, callback, method):
837 if (coroutines.iscoroutine(callback) or
838 coroutines._iscoroutinefunction(callback)):
839 raise TypeError(
840 f"coroutines cannot be used with {method}()")
841 if not callable(callback):
842 raise TypeError(
843 f'a callable object was expected by {method}(), '
844 f'got {callback!r}')
846 def _call_soon(self, callback, args, context):
847 handle = events.Handle(callback, args, self, context)
848 if handle._source_traceback:
849 del handle._source_traceback[-1]
850 self._ready.append(handle)
851 return handle
853 def _check_thread(self):
854 """Check that the current thread is the thread running the event loop.
856 Non-thread-safe methods of this class make this assumption and will
857 likely behave incorrectly when the assumption is violated.
859 Should only be called when (self._debug == True). The caller is
860 responsible for checking this condition for performance reasons.
861 """
862 if self._thread_id is None:
863 return
864 thread_id = threading.get_ident()
865 if thread_id != self._thread_id:
866 raise RuntimeError(
867 "Non-thread-safe operation invoked on an event loop other "
868 "than the current one")
870 def call_soon_threadsafe(self, callback, *args, context=None):
871 """Like call_soon(), but thread-safe."""
872 self._check_closed()
873 if self._debug:
874 self._check_callback(callback, 'call_soon_threadsafe')
875 handle = events._ThreadSafeHandle(callback, args, self, context)
876 self._ready.append(handle)
877 if handle._source_traceback:
878 del handle._source_traceback[-1]
879 if handle._source_traceback:
880 del handle._source_traceback[-1]
881 self._write_to_self()
882 return handle
884 def run_in_executor(self, executor, func, *args):
885 self._check_closed()
886 if self._debug:
887 self._check_callback(func, 'run_in_executor')
888 if executor is None:
889 executor = self._default_executor
890 # Only check when the default executor is being used
891 self._check_default_executor()
892 if executor is None:
893 executor = concurrent.futures.ThreadPoolExecutor(
894 thread_name_prefix='asyncio'
895 )
896 self._default_executor = executor
897 return futures.wrap_future(
898 executor.submit(func, *args), loop=self)
900 def set_default_executor(self, executor):
901 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
902 raise TypeError('executor must be ThreadPoolExecutor instance')
903 self._default_executor = executor
905 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
906 msg = [f"{host}:{port!r}"]
907 if family:
908 msg.append(f'family={family!r}')
909 if type:
910 msg.append(f'type={type!r}')
911 if proto:
912 msg.append(f'proto={proto!r}')
913 if flags:
914 msg.append(f'flags={flags!r}')
915 msg = ', '.join(msg)
916 logger.debug('Get address info %s', msg)
918 t0 = self.time()
919 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
920 dt = self.time() - t0
922 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
923 if dt >= self.slow_callback_duration:
924 logger.info(msg)
925 else:
926 logger.debug(msg)
927 return addrinfo
929 async def getaddrinfo(self, host, port, *,
930 family=0, type=0, proto=0, flags=0):
931 if self._debug: 931 ↛ 932line 931 didn't jump to line 932 because the condition on line 931 was never true
932 getaddr_func = self._getaddrinfo_debug
933 else:
934 getaddr_func = socket.getaddrinfo
936 return await self.run_in_executor(
937 None, getaddr_func, host, port, family, type, proto, flags)
939 async def getnameinfo(self, sockaddr, flags=0):
940 return await self.run_in_executor(
941 None, socket.getnameinfo, sockaddr, flags)
943 async def sock_sendfile(self, sock, file, offset=0, count=None,
944 *, fallback=True):
945 if self._debug and sock.gettimeout() != 0:
946 raise ValueError("the socket must be non-blocking")
947 _check_ssl_socket(sock)
948 self._check_sendfile_params(sock, file, offset, count)
949 try:
950 return await self._sock_sendfile_native(sock, file,
951 offset, count)
952 except exceptions.SendfileNotAvailableError as exc:
953 if not fallback:
954 raise
955 return await self._sock_sendfile_fallback(sock, file,
956 offset, count)
958 async def _sock_sendfile_native(self, sock, file, offset, count):
959 # NB: sendfile syscall is not supported for SSL sockets and
960 # non-mmap files even if sendfile is supported by OS
961 raise exceptions.SendfileNotAvailableError(
962 f"syscall sendfile is not available for socket {sock!r} "
963 f"and file {file!r} combination")
965 async def _sock_sendfile_fallback(self, sock, file, offset, count):
966 if offset:
967 file.seek(offset)
968 blocksize = (
969 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
970 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
971 )
972 buf = bytearray(blocksize)
973 total_sent = 0
974 try:
975 while True:
976 if count:
977 blocksize = min(count - total_sent, blocksize)
978 if blocksize <= 0:
979 break
980 view = memoryview(buf)[:blocksize]
981 read = await self.run_in_executor(None, file.readinto, view)
982 if not read:
983 break # EOF
984 await self.sock_sendall(sock, view[:read])
985 total_sent += read
986 return total_sent
987 finally:
988 if total_sent > 0 and hasattr(file, 'seek'):
989 file.seek(offset + total_sent)
991 def _check_sendfile_params(self, sock, file, offset, count):
992 if 'b' not in getattr(file, 'mode', 'b'):
993 raise ValueError("file should be opened in binary mode")
994 if not sock.type == socket.SOCK_STREAM:
995 raise ValueError("only SOCK_STREAM type sockets are supported")
996 if count is not None:
997 if not isinstance(count, int):
998 raise TypeError(
999 "count must be a positive integer (got {!r})".format(count))
1000 if count <= 0:
1001 raise ValueError(
1002 "count must be a positive integer (got {!r})".format(count))
1003 if not isinstance(offset, int):
1004 raise TypeError(
1005 "offset must be a non-negative integer (got {!r})".format(
1006 offset))
1007 if offset < 0:
1008 raise ValueError(
1009 "offset must be a non-negative integer (got {!r})".format(
1010 offset))
1012 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
1013 """Create, bind and connect one socket."""
1014 my_exceptions = []
1015 exceptions.append(my_exceptions)
1016 family, type_, proto, _, address = addr_info
1017 sock = None
1018 try:
1019 sock = socket.socket(family=family, type=type_, proto=proto)
1020 sock.setblocking(False)
1021 if local_addr_infos is not None:
1022 for lfamily, _, _, _, laddr in local_addr_infos:
1023 # skip local addresses of different family
1024 if lfamily != family:
1025 continue
1026 try:
1027 sock.bind(laddr)
1028 break
1029 except OSError as exc:
1030 msg = (
1031 f'error while attempting to bind on '
1032 f'address {laddr!r}: {str(exc).lower()}'
1033 )
1034 exc = OSError(exc.errno, msg)
1035 my_exceptions.append(exc)
1036 else: # all bind attempts failed
1037 if my_exceptions:
1038 raise my_exceptions.pop()
1039 else:
1040 raise OSError(f"no matching local address with {family=} found")
1041 await self.sock_connect(sock, address)
1042 return sock
1043 except OSError as exc:
1044 my_exceptions.append(exc)
1045 if sock is not None:
1046 sock.close()
1047 raise
1048 except:
1049 if sock is not None:
1050 sock.close()
1051 raise
1052 finally:
1053 exceptions = my_exceptions = None
1055 async def create_connection(
1056 self, protocol_factory, host=None, port=None,
1057 *, ssl=None, family=0,
1058 proto=0, flags=0, sock=None,
1059 local_addr=None, server_hostname=None,
1060 ssl_handshake_timeout=None,
1061 ssl_shutdown_timeout=None,
1062 happy_eyeballs_delay=None, interleave=None,
1063 all_errors=False):
1064 """Connect to a TCP server.
1066 Create a streaming transport connection to a given internet host and
1067 port: socket family AF_INET or socket.AF_INET6 depending on host (or
1068 family if specified), socket type SOCK_STREAM. protocol_factory must be
1069 a callable returning a protocol instance.
1071 This method is a coroutine which will try to establish the connection
1072 in the background. When successful, the coroutine returns a
1073 (transport, protocol) pair.
1074 """
1075 if server_hostname is not None and not ssl:
1076 raise ValueError('server_hostname is only meaningful with ssl')
1078 if server_hostname is None and ssl:
1079 # Use host as default for server_hostname. It is an error
1080 # if host is empty or not set, e.g. when an
1081 # already-connected socket was passed or when only a port
1082 # is given. To avoid this error, you can pass
1083 # server_hostname='' -- this will bypass the hostname
1084 # check. (This also means that if host is a numeric
1085 # IP/IPv6 address, we will attempt to verify that exact
1086 # address; this will probably fail, but it is possible to
1087 # create a certificate for a specific IP address, so we
1088 # don't judge it here.)
1089 if not host:
1090 raise ValueError('You must set server_hostname '
1091 'when using ssl without a host')
1092 server_hostname = host
1094 if ssl_handshake_timeout is not None and not ssl:
1095 raise ValueError(
1096 'ssl_handshake_timeout is only meaningful with ssl')
1098 if ssl_shutdown_timeout is not None and not ssl: 1098 ↛ 1099line 1098 didn't jump to line 1099 because the condition on line 1098 was never true
1099 raise ValueError(
1100 'ssl_shutdown_timeout is only meaningful with ssl')
1102 if sock is not None:
1103 _check_ssl_socket(sock)
1105 if happy_eyeballs_delay is not None and interleave is None:
1106 # If using happy eyeballs, default to interleave addresses by family
1107 interleave = 1
1109 if host is not None or port is not None:
1110 if sock is not None:
1111 raise ValueError(
1112 'host/port and sock can not be specified at the same time')
1114 infos = await self._ensure_resolved(
1115 (host, port), family=family,
1116 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
1117 if not infos:
1118 raise OSError('getaddrinfo() returned empty list')
1120 if local_addr is not None:
1121 laddr_infos = await self._ensure_resolved(
1122 local_addr, family=family,
1123 type=socket.SOCK_STREAM, proto=proto,
1124 flags=flags, loop=self)
1125 if not laddr_infos:
1126 raise OSError('getaddrinfo() returned empty list')
1127 else:
1128 laddr_infos = None
1130 if interleave:
1131 infos = _interleave_addrinfos(infos, interleave)
1133 exceptions = []
1134 if happy_eyeballs_delay is None:
1135 # not using happy eyeballs
1136 for addrinfo in infos:
1137 try:
1138 sock = await self._connect_sock(
1139 exceptions, addrinfo, laddr_infos)
1140 break
1141 except OSError:
1142 continue
1143 else: # using happy eyeballs
1144 sock = (await staggered.staggered_race(
1145 (
1146 # can't use functools.partial as it keeps a reference
1147 # to exceptions
1148 lambda addrinfo=addrinfo: self._connect_sock(
1149 exceptions, addrinfo, laddr_infos
1150 )
1151 for addrinfo in infos
1152 ),
1153 happy_eyeballs_delay,
1154 loop=self,
1155 ))[0] # can't use sock, _, _ as it keeks a reference to exceptions
1157 if sock is None:
1158 exceptions = [exc for sub in exceptions for exc in sub]
1159 try:
1160 if all_errors:
1161 raise ExceptionGroup("create_connection failed", exceptions)
1162 if len(exceptions) == 1:
1163 raise exceptions[0]
1164 else:
1165 # If they all have the same str(), raise one.
1166 model = str(exceptions[0])
1167 if all(str(exc) == model for exc in exceptions):
1168 raise exceptions[0]
1169 # Raise a combined exception so the user can see all
1170 # the various error messages.
1171 raise OSError('Multiple exceptions: {}'.format(
1172 ', '.join(str(exc) for exc in exceptions)))
1173 finally:
1174 exceptions = None
1176 else:
1177 if sock is None:
1178 raise ValueError(
1179 'host and port was not specified and no sock specified')
1180 if sock.type != socket.SOCK_STREAM:
1181 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1182 # are SOCK_STREAM.
1183 # We support passing AF_UNIX sockets even though we have
1184 # a dedicated API for that: create_unix_connection.
1185 # Disallowing AF_UNIX in this method, breaks backwards
1186 # compatibility.
1187 raise ValueError(
1188 f'A Stream Socket was expected, got {sock!r}')
1190 transport, protocol = await self._create_connection_transport(
1191 sock, protocol_factory, ssl, server_hostname,
1192 ssl_handshake_timeout=ssl_handshake_timeout,
1193 ssl_shutdown_timeout=ssl_shutdown_timeout)
1194 if self._debug:
1195 # Get the socket from the transport because SSL transport closes
1196 # the old socket and creates a new SSL socket
1197 sock = transport.get_extra_info('socket')
1198 logger.debug("%r connected to %s:%r: (%r, %r)",
1199 sock, host, port, transport, protocol)
1200 return transport, protocol
1202 async def _create_connection_transport(
1203 self, sock, protocol_factory, ssl,
1204 server_hostname, server_side=False,
1205 ssl_handshake_timeout=None,
1206 ssl_shutdown_timeout=None):
1208 sock.setblocking(False)
1210 protocol = protocol_factory()
1211 waiter = self.create_future()
1212 if ssl:
1213 sslcontext = None if isinstance(ssl, bool) else ssl
1214 transport = self._make_ssl_transport(
1215 sock, protocol, sslcontext, waiter,
1216 server_side=server_side, server_hostname=server_hostname,
1217 ssl_handshake_timeout=ssl_handshake_timeout,
1218 ssl_shutdown_timeout=ssl_shutdown_timeout)
1219 else:
1220 transport = self._make_socket_transport(sock, protocol, waiter)
1222 try:
1223 await waiter
1224 except:
1225 transport.close()
1226 raise
1228 return transport, protocol
1230 async def sendfile(self, transport, file, offset=0, count=None,
1231 *, fallback=True):
1232 """Send a file to transport.
1234 Return the total number of bytes which were sent.
1236 The method uses high-performance os.sendfile if available.
1238 file must be a regular file object opened in binary mode.
1240 offset tells from where to start reading the file. If specified,
1241 count is the total number of bytes to transmit as opposed to
1242 sending the file until EOF is reached. File position is updated on
1243 return or also in case of error in which case file.tell()
1244 can be used to figure out the number of bytes
1245 which were sent.
1247 fallback set to True makes asyncio to manually read and send
1248 the file when the platform does not support the sendfile syscall
1249 (e.g. Windows or SSL socket on Unix).
1251 Raise SendfileNotAvailableError if the system does not support
1252 sendfile syscall and fallback is False.
1253 """
1254 if transport.is_closing():
1255 raise RuntimeError("Transport is closing")
1256 mode = getattr(transport, '_sendfile_compatible',
1257 constants._SendfileMode.UNSUPPORTED)
1258 if mode is constants._SendfileMode.UNSUPPORTED:
1259 raise RuntimeError(
1260 f"sendfile is not supported for transport {transport!r}")
1261 if mode is constants._SendfileMode.TRY_NATIVE:
1262 try:
1263 return await self._sendfile_native(transport, file,
1264 offset, count)
1265 except exceptions.SendfileNotAvailableError as exc:
1266 if not fallback:
1267 raise
1269 if not fallback:
1270 raise RuntimeError(
1271 f"fallback is disabled and native sendfile is not "
1272 f"supported for transport {transport!r}")
1274 return await self._sendfile_fallback(transport, file,
1275 offset, count)
1277 async def _sendfile_native(self, transp, file, offset, count):
1278 raise exceptions.SendfileNotAvailableError(
1279 "sendfile syscall is not supported")
1281 async def _sendfile_fallback(self, transp, file, offset, count):
1282 if offset:
1283 file.seek(offset)
1284 blocksize = min(count, 16384) if count else 16384
1285 buf = bytearray(blocksize)
1286 total_sent = 0
1287 proto = _SendfileFallbackProtocol(transp)
1288 try:
1289 while True:
1290 if count:
1291 blocksize = min(count - total_sent, blocksize)
1292 if blocksize <= 0:
1293 return total_sent
1294 view = memoryview(buf)[:blocksize]
1295 read = await self.run_in_executor(None, file.readinto, view)
1296 if not read:
1297 return total_sent # EOF
1298 transp.write(view[:read])
1299 await proto.drain()
1300 total_sent += read
1301 finally:
1302 if total_sent > 0 and hasattr(file, 'seek'):
1303 file.seek(offset + total_sent)
1304 await proto.restore()
1306 async def start_tls(self, transport, protocol, sslcontext, *,
1307 server_side=False,
1308 server_hostname=None,
1309 ssl_handshake_timeout=None,
1310 ssl_shutdown_timeout=None):
1311 """Upgrade transport to TLS.
1313 Return a new transport that *protocol* should start using
1314 immediately.
1315 """
1316 if ssl is None: 1316 ↛ 1317line 1316 didn't jump to line 1317 because the condition on line 1316 was never true
1317 raise RuntimeError('Python ssl module is not available')
1319 if not isinstance(sslcontext, ssl.SSLContext):
1320 raise TypeError(
1321 f'sslcontext is expected to be an instance of ssl.SSLContext, '
1322 f'got {sslcontext!r}')
1324 if not getattr(transport, '_start_tls_compatible', False):
1325 raise TypeError(
1326 f'transport {transport!r} is not supported by start_tls()')
1328 waiter = self.create_future()
1329 ssl_protocol = sslproto.SSLProtocol(
1330 self, protocol, sslcontext, waiter,
1331 server_side, server_hostname,
1332 ssl_handshake_timeout=ssl_handshake_timeout,
1333 ssl_shutdown_timeout=ssl_shutdown_timeout,
1334 call_connection_made=False)
1336 # Pause early so that "ssl_protocol.data_received()" doesn't
1337 # have a chance to get called before "ssl_protocol.connection_made()".
1338 transport.pause_reading()
1340 transport.set_protocol(ssl_protocol)
1341 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1342 resume_cb = self.call_soon(transport.resume_reading)
1344 try:
1345 await waiter
1346 except BaseException:
1347 transport.close()
1348 conmade_cb.cancel()
1349 resume_cb.cancel()
1350 raise
1352 return ssl_protocol._app_transport
1354 async def create_datagram_endpoint(self, protocol_factory,
1355 local_addr=None, remote_addr=None, *,
1356 family=0, proto=0, flags=0,
1357 reuse_port=None,
1358 allow_broadcast=None, sock=None):
1359 """Create datagram connection."""
1360 if sock is not None:
1361 if sock.type == socket.SOCK_STREAM:
1362 raise ValueError(
1363 f'A datagram socket was expected, got {sock!r}')
1364 if (local_addr or remote_addr or
1365 family or proto or flags or
1366 reuse_port or allow_broadcast):
1367 # show the problematic kwargs in exception msg
1368 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1369 family=family, proto=proto, flags=flags,
1370 reuse_port=reuse_port,
1371 allow_broadcast=allow_broadcast)
1372 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
1373 raise ValueError(
1374 f'socket modifier keyword arguments can not be used '
1375 f'when sock is specified. ({problems})')
1376 sock.setblocking(False)
1377 r_addr = None
1378 else:
1379 if not (local_addr or remote_addr):
1380 if family == 0:
1381 raise ValueError('unexpected address family')
1382 addr_pairs_info = (((family, proto), (None, None)),)
1383 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1384 for addr in (local_addr, remote_addr):
1385 if addr is not None and not isinstance(addr, str): 1385 ↛ 1386line 1385 didn't jump to line 1386 because the condition on line 1385 was never true
1386 raise TypeError('string is expected')
1388 if local_addr and local_addr[0] not in (0, '\x00'): 1388 ↛ 1400line 1388 didn't jump to line 1400 because the condition on line 1388 was always true
1389 try:
1390 if stat.S_ISSOCK(os.stat(local_addr).st_mode): 1390 ↛ 1400line 1390 didn't jump to line 1400 because the condition on line 1390 was always true
1391 os.remove(local_addr)
1392 except FileNotFoundError:
1393 pass
1394 except OSError as err:
1395 # Directory may have permissions only to create socket.
1396 logger.error('Unable to check or remove stale UNIX '
1397 'socket %r: %r',
1398 local_addr, err)
1400 addr_pairs_info = (((family, proto),
1401 (local_addr, remote_addr)), )
1402 else:
1403 # join address by (family, protocol)
1404 addr_infos = {} # Using order preserving dict
1405 for idx, addr in ((0, local_addr), (1, remote_addr)):
1406 if addr is not None:
1407 if not (isinstance(addr, tuple) and len(addr) == 2):
1408 raise TypeError('2-tuple is expected')
1410 infos = await self._ensure_resolved(
1411 addr, family=family, type=socket.SOCK_DGRAM,
1412 proto=proto, flags=flags, loop=self)
1413 if not infos:
1414 raise OSError('getaddrinfo() returned empty list')
1416 for fam, _, pro, _, address in infos:
1417 key = (fam, pro)
1418 if key not in addr_infos: 1418 ↛ 1420line 1418 didn't jump to line 1420 because the condition on line 1418 was always true
1419 addr_infos[key] = [None, None]
1420 addr_infos[key][idx] = address
1422 # each addr has to have info for each (family, proto) pair
1423 addr_pairs_info = [
1424 (key, addr_pair) for key, addr_pair in addr_infos.items()
1425 if not ((local_addr and addr_pair[0] is None) or
1426 (remote_addr and addr_pair[1] is None))]
1428 if not addr_pairs_info:
1429 raise ValueError('can not get address information')
1431 exceptions = []
1433 for ((family, proto),
1434 (local_address, remote_address)) in addr_pairs_info:
1435 sock = None
1436 r_addr = None
1437 try:
1438 sock = socket.socket(
1439 family=family, type=socket.SOCK_DGRAM, proto=proto)
1440 if reuse_port:
1441 _set_reuseport(sock)
1442 if allow_broadcast:
1443 sock.setsockopt(
1444 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1445 sock.setblocking(False)
1447 if local_addr:
1448 sock.bind(local_address)
1449 if remote_addr:
1450 if not allow_broadcast:
1451 await self.sock_connect(sock, remote_address)
1452 r_addr = remote_address
1453 except OSError as exc:
1454 if sock is not None:
1455 sock.close()
1456 exceptions.append(exc)
1457 except:
1458 if sock is not None: 1458 ↛ 1460line 1458 didn't jump to line 1460 because the condition on line 1458 was always true
1459 sock.close()
1460 raise
1461 else:
1462 break
1463 else:
1464 raise exceptions[0]
1466 protocol = protocol_factory()
1467 waiter = self.create_future()
1468 transport = self._make_datagram_transport(
1469 sock, protocol, r_addr, waiter)
1470 if self._debug: 1470 ↛ 1471line 1470 didn't jump to line 1471 because the condition on line 1470 was never true
1471 if local_addr:
1472 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1473 "created: (%r, %r)",
1474 local_addr, remote_addr, transport, protocol)
1475 else:
1476 logger.debug("Datagram endpoint remote_addr=%r created: "
1477 "(%r, %r)",
1478 remote_addr, transport, protocol)
1480 try:
1481 await waiter
1482 except:
1483 transport.close()
1484 raise
1486 return transport, protocol
1488 async def _ensure_resolved(self, address, *,
1489 family=0, type=socket.SOCK_STREAM,
1490 proto=0, flags=0, loop):
1491 host, port = address[:2]
1492 info = _ipaddr_info(host, port, family, type, proto, *address[2:])
1493 if info is not None:
1494 # "host" is already a resolved IP.
1495 return [info]
1496 else:
1497 return await loop.getaddrinfo(host, port, family=family, type=type,
1498 proto=proto, flags=flags)
1500 async def _create_server_getaddrinfo(self, host, port, family, flags):
1501 infos = await self._ensure_resolved((host, port), family=family,
1502 type=socket.SOCK_STREAM,
1503 flags=flags, loop=self)
1504 if not infos:
1505 raise OSError(f'getaddrinfo({host!r}) returned empty list')
1506 return infos
1508 async def create_server(
1509 self, protocol_factory, host=None, port=None,
1510 *,
1511 family=socket.AF_UNSPEC,
1512 flags=socket.AI_PASSIVE,
1513 sock=None,
1514 backlog=100,
1515 ssl=None,
1516 reuse_address=None,
1517 reuse_port=None,
1518 keep_alive=None,
1519 ssl_handshake_timeout=None,
1520 ssl_shutdown_timeout=None,
1521 start_serving=True):
1522 """Create a TCP server.
1524 The host parameter can be a string, in that case the TCP server is
1525 bound to host and port.
1527 The host parameter can also be a sequence of strings and in that case
1528 the TCP server is bound to all hosts of the sequence. If a host
1529 appears multiple times (possibly indirectly e.g. when hostnames
1530 resolve to the same IP address), the server is only bound once to that
1531 host.
1533 Return a Server object which can be used to stop the service.
1535 This method is a coroutine.
1536 """
1537 if isinstance(ssl, bool): 1537 ↛ 1538line 1537 didn't jump to line 1538 because the condition on line 1537 was never true
1538 raise TypeError('ssl argument must be an SSLContext or None')
1540 if ssl_handshake_timeout is not None and ssl is None:
1541 raise ValueError(
1542 'ssl_handshake_timeout is only meaningful with ssl')
1544 if ssl_shutdown_timeout is not None and ssl is None: 1544 ↛ 1545line 1544 didn't jump to line 1545 because the condition on line 1544 was never true
1545 raise ValueError(
1546 'ssl_shutdown_timeout is only meaningful with ssl')
1548 if sock is not None:
1549 _check_ssl_socket(sock)
1551 if host is not None or port is not None:
1552 if sock is not None:
1553 raise ValueError(
1554 'host/port and sock can not be specified at the same time')
1556 if reuse_address is None: 1556 ↛ 1558line 1556 didn't jump to line 1558 because the condition on line 1556 was always true
1557 reuse_address = os.name == "posix" and sys.platform != "cygwin"
1558 sockets = []
1559 if host == '':
1560 hosts = [None]
1561 elif (isinstance(host, str) or
1562 not isinstance(host, collections.abc.Iterable)):
1563 hosts = [host]
1564 else:
1565 hosts = host
1567 fs = [self._create_server_getaddrinfo(host, port, family=family,
1568 flags=flags)
1569 for host in hosts]
1570 infos = await tasks.gather(*fs)
1571 infos = set(itertools.chain.from_iterable(infos))
1573 completed = False
1574 try:
1575 for res in infos:
1576 af, socktype, proto, canonname, sa = res
1577 try:
1578 sock = socket.socket(af, socktype, proto)
1579 except socket.error:
1580 # Assume it's a bad family/type/protocol combination.
1581 if self._debug:
1582 logger.warning('create_server() failed to create '
1583 'socket.socket(%r, %r, %r)',
1584 af, socktype, proto, exc_info=True)
1585 continue
1586 sockets.append(sock)
1587 if reuse_address: 1587 ↛ 1592line 1587 didn't jump to line 1592 because the condition on line 1587 was always true
1588 sock.setsockopt(
1589 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1590 # Since Linux 6.12.9, SO_REUSEPORT is not allowed
1591 # on other address families than AF_INET/AF_INET6.
1592 if reuse_port and af in (socket.AF_INET, socket.AF_INET6):
1593 _set_reuseport(sock)
1594 if keep_alive: 1594 ↛ 1595line 1594 didn't jump to line 1595 because the condition on line 1594 was never true
1595 sock.setsockopt(
1596 socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)
1597 # Disable IPv4/IPv6 dual stack support (enabled by
1598 # default on Linux) which makes a single socket
1599 # listen on both address families.
1600 if (_HAS_IPv6 and
1601 af == socket.AF_INET6 and
1602 hasattr(socket, 'IPPROTO_IPV6')):
1603 sock.setsockopt(socket.IPPROTO_IPV6,
1604 socket.IPV6_V6ONLY,
1605 True)
1606 try:
1607 sock.bind(sa)
1608 except OSError as err:
1609 msg = ('error while attempting '
1610 'to bind on address %r: %s'
1611 % (sa, str(err).lower()))
1612 if err.errno == errno.EADDRNOTAVAIL: 1612 ↛ 1614line 1612 didn't jump to line 1614 because the condition on line 1612 was never true
1613 # Assume the family is not enabled (bpo-30945)
1614 sockets.pop()
1615 sock.close()
1616 if self._debug:
1617 logger.warning(msg)
1618 continue
1619 raise OSError(err.errno, msg) from None
1621 if not sockets: 1621 ↛ 1622line 1621 didn't jump to line 1622 because the condition on line 1621 was never true
1622 raise OSError('could not bind on any address out of %r'
1623 % ([info[4] for info in infos],))
1625 completed = True
1626 finally:
1627 if not completed:
1628 for sock in sockets: 1628 ↛ 1637line 1628 didn't jump to line 1637 because the loop on line 1628 didn't complete
1629 sock.close()
1630 else:
1631 if sock is None:
1632 raise ValueError('Neither host/port nor sock were specified')
1633 if sock.type != socket.SOCK_STREAM:
1634 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1635 sockets = [sock]
1637 for sock in sockets:
1638 sock.setblocking(False)
1640 server = Server(self, sockets, protocol_factory,
1641 ssl, backlog, ssl_handshake_timeout,
1642 ssl_shutdown_timeout)
1643 if start_serving:
1644 server._start_serving()
1645 # Skip one loop iteration so that all 'loop.add_reader'
1646 # go through.
1647 await tasks.sleep(0)
1649 if self._debug:
1650 logger.info("%r is serving", server)
1651 return server
1653 async def connect_accepted_socket(
1654 self, protocol_factory, sock,
1655 *, ssl=None,
1656 ssl_handshake_timeout=None,
1657 ssl_shutdown_timeout=None):
1658 if sock.type != socket.SOCK_STREAM: 1658 ↛ 1659line 1658 didn't jump to line 1659 because the condition on line 1658 was never true
1659 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1661 if ssl_handshake_timeout is not None and not ssl:
1662 raise ValueError(
1663 'ssl_handshake_timeout is only meaningful with ssl')
1665 if ssl_shutdown_timeout is not None and not ssl: 1665 ↛ 1666line 1665 didn't jump to line 1666 because the condition on line 1665 was never true
1666 raise ValueError(
1667 'ssl_shutdown_timeout is only meaningful with ssl')
1669 _check_ssl_socket(sock)
1671 transport, protocol = await self._create_connection_transport(
1672 sock, protocol_factory, ssl, '', server_side=True,
1673 ssl_handshake_timeout=ssl_handshake_timeout,
1674 ssl_shutdown_timeout=ssl_shutdown_timeout)
1675 if self._debug: 1675 ↛ 1678line 1675 didn't jump to line 1678 because the condition on line 1675 was never true
1676 # Get the socket from the transport because SSL transport closes
1677 # the old socket and creates a new SSL socket
1678 sock = transport.get_extra_info('socket')
1679 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1680 return transport, protocol
1682 async def connect_read_pipe(self, protocol_factory, pipe):
1683 protocol = protocol_factory()
1684 waiter = self.create_future()
1685 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
1687 try:
1688 await waiter
1689 except:
1690 transport.close()
1691 raise
1693 if self._debug: 1693 ↛ 1694line 1693 didn't jump to line 1694 because the condition on line 1693 was never true
1694 logger.debug('Read pipe %r connected: (%r, %r)',
1695 pipe.fileno(), transport, protocol)
1696 return transport, protocol
1698 async def connect_write_pipe(self, protocol_factory, pipe):
1699 protocol = protocol_factory()
1700 waiter = self.create_future()
1701 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
1703 try:
1704 await waiter
1705 except:
1706 transport.close()
1707 raise
1709 if self._debug: 1709 ↛ 1710line 1709 didn't jump to line 1710 because the condition on line 1709 was never true
1710 logger.debug('Write pipe %r connected: (%r, %r)',
1711 pipe.fileno(), transport, protocol)
1712 return transport, protocol
1714 def _log_subprocess(self, msg, stdin, stdout, stderr):
1715 info = [msg]
1716 if stdin is not None:
1717 info.append(f'stdin={_format_pipe(stdin)}')
1718 if stdout is not None and stderr == subprocess.STDOUT:
1719 info.append(f'stdout=stderr={_format_pipe(stdout)}')
1720 else:
1721 if stdout is not None:
1722 info.append(f'stdout={_format_pipe(stdout)}')
1723 if stderr is not None:
1724 info.append(f'stderr={_format_pipe(stderr)}')
1725 logger.debug(' '.join(info))
1727 async def subprocess_shell(self, protocol_factory, cmd, *,
1728 stdin=subprocess.PIPE,
1729 stdout=subprocess.PIPE,
1730 stderr=subprocess.PIPE,
1731 universal_newlines=False,
1732 shell=True, bufsize=0,
1733 encoding=None, errors=None, text=None,
1734 **kwargs):
1735 if not isinstance(cmd, (bytes, str)):
1736 raise ValueError("cmd must be a string")
1737 if universal_newlines:
1738 raise ValueError("universal_newlines must be False")
1739 if not shell:
1740 raise ValueError("shell must be True")
1741 if bufsize != 0:
1742 raise ValueError("bufsize must be 0")
1743 if text:
1744 raise ValueError("text must be False")
1745 if encoding is not None:
1746 raise ValueError("encoding must be None")
1747 if errors is not None:
1748 raise ValueError("errors must be None")
1750 protocol = protocol_factory()
1751 debug_log = None
1752 if self._debug: 1752 ↛ 1755line 1752 didn't jump to line 1755 because the condition on line 1752 was never true
1753 # don't log parameters: they may contain sensitive information
1754 # (password) and may be too long
1755 debug_log = 'run shell command %r' % cmd
1756 self._log_subprocess(debug_log, stdin, stdout, stderr)
1757 transport = await self._make_subprocess_transport(
1758 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
1759 if self._debug and debug_log is not None: 1759 ↛ 1760line 1759 didn't jump to line 1760 because the condition on line 1759 was never true
1760 logger.info('%s: %r', debug_log, transport)
1761 return transport, protocol
1763 async def subprocess_exec(self, protocol_factory, program, *args,
1764 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1765 stderr=subprocess.PIPE, universal_newlines=False,
1766 shell=False, bufsize=0,
1767 encoding=None, errors=None, text=None,
1768 **kwargs):
1769 if universal_newlines:
1770 raise ValueError("universal_newlines must be False")
1771 if shell:
1772 raise ValueError("shell must be False")
1773 if bufsize != 0:
1774 raise ValueError("bufsize must be 0")
1775 if text:
1776 raise ValueError("text must be False")
1777 if encoding is not None:
1778 raise ValueError("encoding must be None")
1779 if errors is not None:
1780 raise ValueError("errors must be None")
1782 popen_args = (program,) + args
1783 protocol = protocol_factory()
1784 debug_log = None
1785 if self._debug: 1785 ↛ 1788line 1785 didn't jump to line 1788 because the condition on line 1785 was never true
1786 # don't log parameters: they may contain sensitive information
1787 # (password) and may be too long
1788 debug_log = f'execute program {program!r}'
1789 self._log_subprocess(debug_log, stdin, stdout, stderr)
1790 transport = await self._make_subprocess_transport(
1791 protocol, popen_args, False, stdin, stdout, stderr,
1792 bufsize, **kwargs)
1793 if self._debug and debug_log is not None: 1793 ↛ 1794line 1793 didn't jump to line 1794 because the condition on line 1793 was never true
1794 logger.info('%s: %r', debug_log, transport)
1795 return transport, protocol
1797 def get_exception_handler(self):
1798 """Return an exception handler, or None if the default one is in use.
1799 """
1800 return self._exception_handler
1802 def set_exception_handler(self, handler):
1803 """Set handler as the new event loop exception handler.
1805 If handler is None, the default exception handler will
1806 be set.
1808 If handler is a callable object, it should have a
1809 signature matching '(loop, context)', where 'loop'
1810 will be a reference to the active event loop, 'context'
1811 will be a dict object (see `call_exception_handler()`
1812 documentation for details about context).
1813 """
1814 if handler is not None and not callable(handler):
1815 raise TypeError(f'A callable object or None is expected, '
1816 f'got {handler!r}')
1817 self._exception_handler = handler
1819 def default_exception_handler(self, context):
1820 """Default exception handler.
1822 This is called when an exception occurs and no exception
1823 handler is set, and can be called by a custom exception
1824 handler that wants to defer to the default behavior.
1826 This default handler logs the error message and other
1827 context-dependent information. In debug mode, a truncated
1828 stack trace is also appended showing where the given object
1829 (e.g. a handle or future or task) was created, if any.
1831 The context parameter has the same meaning as in
1832 `call_exception_handler()`.
1833 """
1834 message = context.get('message')
1835 if not message: 1835 ↛ 1836line 1835 didn't jump to line 1836 because the condition on line 1835 was never true
1836 message = 'Unhandled exception in event loop'
1838 exception = context.get('exception')
1839 if exception is not None:
1840 exc_info = (type(exception), exception, exception.__traceback__)
1841 else:
1842 exc_info = False
1844 if ('source_traceback' not in context and 1844 ↛ 1847line 1844 didn't jump to line 1847 because the condition on line 1844 was never true
1845 self._current_handle is not None and
1846 self._current_handle._source_traceback):
1847 context['handle_traceback'] = \
1848 self._current_handle._source_traceback
1850 log_lines = [message]
1851 for key in sorted(context):
1852 if key in {'message', 'exception'}:
1853 continue
1854 value = context[key]
1855 if key == 'source_traceback':
1856 tb = ''.join(traceback.format_list(value))
1857 value = 'Object created at (most recent call last):\n'
1858 value += tb.rstrip()
1859 elif key == 'handle_traceback': 1859 ↛ 1860line 1859 didn't jump to line 1860 because the condition on line 1859 was never true
1860 tb = ''.join(traceback.format_list(value))
1861 value = 'Handle created at (most recent call last):\n'
1862 value += tb.rstrip()
1863 else:
1864 value = repr(value)
1865 log_lines.append(f'{key}: {value}')
1867 logger.error('\n'.join(log_lines), exc_info=exc_info)
1869 def call_exception_handler(self, context):
1870 """Call the current event loop's exception handler.
1872 The context argument is a dict containing the following keys:
1874 - 'message': Error message;
1875 - 'exception' (optional): Exception object;
1876 - 'future' (optional): Future instance;
1877 - 'task' (optional): Task instance;
1878 - 'handle' (optional): Handle instance;
1879 - 'protocol' (optional): Protocol instance;
1880 - 'transport' (optional): Transport instance;
1881 - 'socket' (optional): Socket instance;
1882 - 'source_traceback' (optional): Traceback of the source;
1883 - 'handle_traceback' (optional): Traceback of the handle;
1884 - 'asyncgen' (optional): Asynchronous generator that caused
1885 the exception.
1887 New keys maybe introduced in the future.
1889 Note: do not overload this method in an event loop subclass.
1890 For custom exception handling, use the
1891 `set_exception_handler()` method.
1892 """
1893 if self._exception_handler is None:
1894 try:
1895 self.default_exception_handler(context)
1896 except (SystemExit, KeyboardInterrupt):
1897 raise
1898 except BaseException:
1899 # Second protection layer for unexpected errors
1900 # in the default implementation, as well as for subclassed
1901 # event loops with overloaded "default_exception_handler".
1902 logger.error('Exception in default exception handler',
1903 exc_info=True)
1904 else:
1905 try:
1906 ctx = None
1907 thing = context.get("task")
1908 if thing is None:
1909 # Even though Futures don't have a context,
1910 # Task is a subclass of Future,
1911 # and sometimes the 'future' key holds a Task.
1912 thing = context.get("future")
1913 if thing is None:
1914 # Handles also have a context.
1915 thing = context.get("handle")
1916 if thing is not None and hasattr(thing, "get_context"):
1917 ctx = thing.get_context()
1918 if ctx is not None and hasattr(ctx, "run"):
1919 ctx.run(self._exception_handler, self, context)
1920 else:
1921 self._exception_handler(self, context)
1922 except (SystemExit, KeyboardInterrupt):
1923 raise
1924 except BaseException as exc:
1925 # Exception in the user set custom exception handler.
1926 try:
1927 # Let's try default handler.
1928 self.default_exception_handler({
1929 'message': 'Unhandled error in exception handler',
1930 'exception': exc,
1931 'context': context,
1932 })
1933 except (SystemExit, KeyboardInterrupt):
1934 raise
1935 except BaseException:
1936 # Guard 'default_exception_handler' in case it is
1937 # overloaded.
1938 logger.error('Exception in default exception handler '
1939 'while handling an unexpected error '
1940 'in custom exception handler',
1941 exc_info=True)
1943 def _add_callback(self, handle):
1944 """Add a Handle to _ready."""
1945 if not handle._cancelled:
1946 self._ready.append(handle)
1948 def _add_callback_signalsafe(self, handle):
1949 """Like _add_callback() but called from a signal handler."""
1950 self._add_callback(handle)
1951 self._write_to_self()
1953 def _timer_handle_cancelled(self, handle):
1954 """Notification that a TimerHandle has been cancelled."""
1955 if handle._scheduled:
1956 self._timer_cancelled_count += 1
1958 def _run_once(self):
1959 """Run one full iteration of the event loop.
1961 This calls all currently ready callbacks, polls for I/O,
1962 schedules the resulting callbacks, and finally schedules
1963 'call_later' callbacks.
1964 """
1966 sched_count = len(self._scheduled)
1967 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1968 self._timer_cancelled_count / sched_count >
1969 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1970 # Remove delayed calls that were cancelled if their number
1971 # is too high
1972 new_scheduled = []
1973 for handle in self._scheduled:
1974 if handle._cancelled:
1975 handle._scheduled = False
1976 else:
1977 new_scheduled.append(handle)
1979 heapq.heapify(new_scheduled)
1980 self._scheduled = new_scheduled
1981 self._timer_cancelled_count = 0
1982 else:
1983 # Remove delayed calls that were cancelled from head of queue.
1984 while self._scheduled and self._scheduled[0]._cancelled:
1985 self._timer_cancelled_count -= 1
1986 handle = heapq.heappop(self._scheduled)
1987 handle._scheduled = False
1989 timeout = None
1990 if self._ready or self._stopping:
1991 timeout = 0
1992 elif self._scheduled:
1993 # Compute the desired timeout.
1994 timeout = self._scheduled[0]._when - self.time()
1995 if timeout > MAXIMUM_SELECT_TIMEOUT: 1995 ↛ 1996line 1995 didn't jump to line 1996 because the condition on line 1995 was never true
1996 timeout = MAXIMUM_SELECT_TIMEOUT
1997 elif timeout < 0:
1998 timeout = 0
2000 event_list = self._selector.select(timeout)
2001 self._process_events(event_list)
2002 # Needed to break cycles when an exception occurs.
2003 event_list = None
2005 # Handle 'later' callbacks that are ready.
2006 end_time = self.time() + self._clock_resolution
2007 while self._scheduled:
2008 handle = self._scheduled[0]
2009 if handle._when >= end_time:
2010 break
2011 handle = heapq.heappop(self._scheduled)
2012 handle._scheduled = False
2013 self._ready.append(handle)
2015 # This is the only place where callbacks are actually *called*.
2016 # All other places just add them to ready.
2017 # Note: We run all currently scheduled callbacks, but not any
2018 # callbacks scheduled by callbacks run this time around --
2019 # they will be run the next time (after another I/O poll).
2020 # Use an idiom that is thread-safe without using locks.
2021 ntodo = len(self._ready)
2022 for i in range(ntodo):
2023 handle = self._ready.popleft()
2024 if handle._cancelled:
2025 continue
2026 if self._debug:
2027 try:
2028 self._current_handle = handle
2029 t0 = self.time()
2030 handle._run()
2031 dt = self.time() - t0
2032 if dt >= self.slow_callback_duration:
2033 logger.warning('Executing %s took %.3f seconds',
2034 _format_handle(handle), dt)
2035 finally:
2036 self._current_handle = None
2037 else:
2038 handle._run()
2039 handle = None # Needed to break cycles when an exception occurs.
2041 def _set_coroutine_origin_tracking(self, enabled):
2042 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
2043 return
2045 if enabled:
2046 self._coroutine_origin_tracking_saved_depth = (
2047 sys.get_coroutine_origin_tracking_depth())
2048 sys.set_coroutine_origin_tracking_depth(
2049 constants.DEBUG_STACK_DEPTH)
2050 else:
2051 sys.set_coroutine_origin_tracking_depth(
2052 self._coroutine_origin_tracking_saved_depth)
2054 self._coroutine_origin_tracking_enabled = enabled
2056 def get_debug(self):
2057 return self._debug
2059 def set_debug(self, enabled):
2060 self._debug = enabled
2062 if self.is_running():
2063 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)