Coverage for Lib/asyncio/selector_events.py: 87%
921 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:28 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:28 +0000
1"""Event loop using a selector and related classes.
3A selector is a "notify-when-ready" multiplexer. For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
7__all__ = 'BaseSelectorEventLoop',
9import collections
10import errno
11import functools
12import itertools
13import os
14import selectors
15import socket
16import warnings
17import weakref
18try:
19 import ssl
20except ImportError: # pragma: no cover
21 ssl = None
23from . import base_events
24from . import constants
25from . import events
26from . import futures
27from . import protocols
28from . import sslproto
29from . import transports
30from . import trsock
31from .log import logger
33_HAS_SENDMSG = hasattr(socket.socket, 'sendmsg')
35if _HAS_SENDMSG: 35 ↛ 42line 35 didn't jump to line 42 because the condition on line 35 was always true
36 try:
37 SC_IOV_MAX = os.sysconf('SC_IOV_MAX')
38 except OSError:
39 # Fallback to send
40 _HAS_SENDMSG = False
42def _test_selector_event(selector, fd, event):
43 # Test if the selector is monitoring 'event' events
44 # for the file descriptor 'fd'.
45 try:
46 key = selector.get_key(fd)
47 except KeyError:
48 return False
49 else:
50 return bool(key.events & event)
53class BaseSelectorEventLoop(base_events.BaseEventLoop):
54 """Selector event loop.
56 See events.AbstractEventLoop for API specification.
57 """
59 def __init__(self, selector=None):
60 super().__init__()
62 if selector is None:
63 selector = selectors.DefaultSelector()
64 logger.debug('Using selector: %s', selector.__class__.__name__)
65 self._selector = selector
66 self._make_self_pipe()
67 self._transports = weakref.WeakValueDictionary()
69 def _make_socket_transport(self, sock, protocol, waiter=None, *,
70 extra=None, server=None, context=None):
71 self._ensure_fd_no_transport(sock)
72 return _SelectorSocketTransport(self, sock, protocol, waiter,
73 extra, server, context=context)
75 def _make_ssl_transport(
76 self, rawsock, protocol, sslcontext, waiter=None,
77 *, server_side=False, server_hostname=None,
78 extra=None, server=None,
79 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
80 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT,
81 context=None,
82 ):
83 self._ensure_fd_no_transport(rawsock)
84 ssl_protocol = sslproto.SSLProtocol(
85 self, protocol, sslcontext, waiter,
86 server_side, server_hostname,
87 ssl_handshake_timeout=ssl_handshake_timeout,
88 ssl_shutdown_timeout=ssl_shutdown_timeout,
89 )
90 _SelectorSocketTransport(self, rawsock, ssl_protocol,
91 extra=extra, server=server, context=context)
92 return ssl_protocol._app_transport
94 def _make_datagram_transport(self, sock, protocol,
95 address=None, waiter=None, extra=None):
96 self._ensure_fd_no_transport(sock)
97 return _SelectorDatagramTransport(self, sock, protocol,
98 address, waiter, extra)
100 def close(self):
101 if self.is_running():
102 raise RuntimeError("Cannot close a running event loop")
103 if self.is_closed():
104 return
105 self._close_self_pipe()
106 super().close()
107 if self._selector is not None:
108 self._selector.close()
109 self._selector = None
111 def _close_self_pipe(self):
112 self._remove_reader(self._ssock.fileno())
113 self._ssock.close()
114 self._ssock = None
115 self._csock.close()
116 self._csock = None
117 self._internal_fds -= 1
119 def _make_self_pipe(self):
120 # A self-socket, really. :-)
121 self._ssock, self._csock = socket.socketpair()
122 self._ssock.setblocking(False)
123 self._csock.setblocking(False)
124 self._internal_fds += 1
125 self._add_reader(self._ssock.fileno(), self._read_from_self)
127 def _process_self_data(self, data):
128 pass
130 def _read_from_self(self):
131 while True:
132 try:
133 data = self._ssock.recv(4096)
134 if not data: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 break
136 self._process_self_data(data)
137 except InterruptedError:
138 continue
139 except BlockingIOError:
140 break
142 def _write_to_self(self):
143 # This may be called from a different thread, possibly after
144 # _close_self_pipe() has been called or even while it is
145 # running. Guard for self._csock being None or closed. When
146 # a socket is closed, send() raises OSError (with errno set to
147 # EBADF, but let's not rely on the exact error code).
148 csock = self._csock
149 if csock is None: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true
150 return
152 try:
153 csock.send(b'\0')
154 except OSError:
155 if self._debug: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 logger.debug("Fail to write a null byte into the "
157 "self-pipe socket",
158 exc_info=True)
160 def _start_serving(self, protocol_factory, sock,
161 sslcontext=None, server=None, backlog=100,
162 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
163 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT, context=None):
164 self._add_reader(sock.fileno(), self._accept_connection,
165 protocol_factory, sock, sslcontext, server, backlog,
166 ssl_handshake_timeout, ssl_shutdown_timeout, context)
168 def _accept_connection(
169 self, protocol_factory, sock,
170 sslcontext=None, server=None, backlog=100,
171 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
172 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT, context=None):
173 # This method is only called once for each event loop tick where the
174 # listening socket has triggered an EVENT_READ. There may be multiple
175 # connections waiting for an .accept() so it is called in a loop.
176 # See https://bugs.python.org/issue27906 for more details.
177 for _ in range(backlog + 1):
178 try:
179 conn, addr = sock.accept()
180 if self._debug:
181 logger.debug("%r got a new connection from %r: %r",
182 server, addr, conn)
183 conn.setblocking(False)
184 except ConnectionAbortedError:
185 # Discard connections that were aborted before accept().
186 continue
187 except (BlockingIOError, InterruptedError):
188 # Early exit because of a signal or
189 # the socket accept buffer is empty.
190 return
191 except OSError as exc:
192 # There's nowhere to send the error, so just log it.
193 if exc.errno in (errno.EMFILE, errno.ENFILE, 193 ↛ 210line 193 didn't jump to line 210 because the condition on line 193 was always true
194 errno.ENOBUFS, errno.ENOMEM):
195 # Some platforms (e.g. Linux keep reporting the FD as
196 # ready, so we remove the read handler temporarily.
197 # We'll try again in a while.
198 self.call_exception_handler({
199 'message': 'socket.accept() out of system resource',
200 'exception': exc,
201 'socket': trsock.TransportSocket(sock),
202 })
203 self._remove_reader(sock.fileno())
204 self.call_later(constants.ACCEPT_RETRY_DELAY,
205 self._start_serving,
206 protocol_factory, sock, sslcontext, server,
207 backlog, ssl_handshake_timeout,
208 ssl_shutdown_timeout, context)
209 else:
210 raise # The event loop will catch, log and ignore it.
211 else:
212 extra = {'peername': addr}
213 conn_context = context.copy() if context is not None else None
214 accept = self._accept_connection2(
215 protocol_factory, conn, extra, sslcontext, server,
216 ssl_handshake_timeout, ssl_shutdown_timeout, context=conn_context)
217 self.create_task(accept, context=conn_context)
219 async def _accept_connection2(
220 self, protocol_factory, conn, extra,
221 sslcontext=None, server=None,
222 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
223 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT, context=None):
224 protocol = None
225 transport = None
226 try:
227 protocol = protocol_factory()
228 waiter = self.create_future()
229 if sslcontext:
230 transport = self._make_ssl_transport(
231 conn, protocol, sslcontext, waiter=waiter,
232 server_side=True, extra=extra, server=server,
233 ssl_handshake_timeout=ssl_handshake_timeout,
234 ssl_shutdown_timeout=ssl_shutdown_timeout,
235 context=context)
236 else:
237 transport = self._make_socket_transport(
238 conn, protocol, waiter=waiter, extra=extra,
239 server=server, context=context)
241 try:
242 await waiter
243 except BaseException:
244 transport.close()
245 # gh-109534: When an exception is raised by the SSLProtocol object the
246 # exception set in this future can keep the protocol object alive and
247 # cause a reference cycle.
248 waiter = None
249 raise
250 # It's now up to the protocol to handle the connection.
252 except (SystemExit, KeyboardInterrupt):
253 raise
254 except BaseException as exc:
255 if self._debug: 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true
256 context = {
257 'message':
258 'Error on transport creation for incoming connection',
259 'exception': exc,
260 }
261 if protocol is not None:
262 context['protocol'] = protocol
263 if transport is not None:
264 context['transport'] = transport
265 self.call_exception_handler(context)
267 def _ensure_fd_no_transport(self, fd):
268 fileno = fd
269 if not isinstance(fileno, int):
270 try:
271 fileno = int(fileno.fileno())
272 except (AttributeError, TypeError, ValueError):
273 # This code matches selectors._fileobj_to_fd function.
274 raise ValueError(f"Invalid file object: {fd!r}") from None
275 transport = self._transports.get(fileno)
276 if transport and not transport.is_closing():
277 raise RuntimeError(
278 f'File descriptor {fd!r} is used by transport '
279 f'{transport!r}')
281 def _add_reader(self, fd, callback, *args, context=None):
282 self._check_closed()
283 handle = events.Handle(callback, args, self, context=context)
284 key = self._selector.get_map().get(fd)
285 if key is None:
286 self._selector.register(fd, selectors.EVENT_READ,
287 (handle, None))
288 else:
289 mask, (reader, writer) = key.events, key.data
290 self._selector.modify(fd, mask | selectors.EVENT_READ,
291 (handle, writer))
292 if reader is not None:
293 reader.cancel()
294 return handle
296 def _remove_reader(self, fd):
297 if self.is_closed():
298 return False
299 key = self._selector.get_map().get(fd)
300 if key is None:
301 return False
302 mask, (reader, writer) = key.events, key.data
303 mask &= ~selectors.EVENT_READ
304 if not mask:
305 self._selector.unregister(fd)
306 else:
307 self._selector.modify(fd, mask, (None, writer))
309 if reader is not None:
310 reader.cancel()
311 return True
312 else:
313 return False
315 def _add_writer(self, fd, callback, *args, context=None):
316 self._check_closed()
317 handle = events.Handle(callback, args, self, context=context)
318 key = self._selector.get_map().get(fd)
319 if key is None:
320 self._selector.register(fd, selectors.EVENT_WRITE,
321 (None, handle))
322 else:
323 mask, (reader, writer) = key.events, key.data
324 self._selector.modify(fd, mask | selectors.EVENT_WRITE,
325 (reader, handle))
326 if writer is not None:
327 writer.cancel()
328 return handle
330 def _remove_writer(self, fd):
331 """Remove a writer callback."""
332 if self.is_closed():
333 return False
334 key = self._selector.get_map().get(fd)
335 if key is None:
336 return False
337 mask, (reader, writer) = key.events, key.data
338 # Remove both writer and connector.
339 mask &= ~selectors.EVENT_WRITE
340 if not mask:
341 self._selector.unregister(fd)
342 else:
343 self._selector.modify(fd, mask, (reader, None))
345 if writer is not None:
346 writer.cancel()
347 return True
348 else:
349 return False
351 def add_reader(self, fd, callback, *args):
352 """Add a reader callback."""
353 self._ensure_fd_no_transport(fd)
354 self._add_reader(fd, callback, *args)
356 def remove_reader(self, fd):
357 """Remove a reader callback."""
358 self._ensure_fd_no_transport(fd)
359 return self._remove_reader(fd)
361 def add_writer(self, fd, callback, *args):
362 """Add a writer callback.."""
363 self._ensure_fd_no_transport(fd)
364 self._add_writer(fd, callback, *args)
366 def remove_writer(self, fd):
367 """Remove a writer callback."""
368 self._ensure_fd_no_transport(fd)
369 return self._remove_writer(fd)
371 async def sock_recv(self, sock, n):
372 """Receive data from the socket.
374 The return value is a bytes object representing the data received.
375 The maximum amount of data to be received at once is specified by
376 nbytes.
377 """
378 base_events._check_ssl_socket(sock)
379 if self._debug and sock.gettimeout() != 0:
380 raise ValueError("the socket must be non-blocking")
381 try:
382 return sock.recv(n)
383 except (BlockingIOError, InterruptedError):
384 pass
385 fut = self.create_future()
386 fd = sock.fileno()
387 self._ensure_fd_no_transport(fd)
388 handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
389 fut.add_done_callback(
390 functools.partial(self._sock_read_done, fd, handle=handle))
391 return await fut
393 def _sock_read_done(self, fd, fut, handle=None):
394 if handle is None or not handle.cancelled():
395 self.remove_reader(fd)
397 def _sock_recv(self, fut, sock, n):
398 # _sock_recv() can add itself as an I/O callback if the operation can't
399 # be done immediately. Don't use it directly, call sock_recv().
400 if fut.done(): 400 ↛ 401line 400 didn't jump to line 401 because the condition on line 400 was never true
401 return
402 try:
403 data = sock.recv(n)
404 except (BlockingIOError, InterruptedError):
405 return # try again next time
406 except (SystemExit, KeyboardInterrupt):
407 raise
408 except BaseException as exc:
409 fut.set_exception(exc)
410 else:
411 fut.set_result(data)
413 async def sock_recv_into(self, sock, buf):
414 """Receive data from the socket.
416 The received data is written into *buf* (a writable buffer).
417 The return value is the number of bytes written.
418 """
419 base_events._check_ssl_socket(sock)
420 if self._debug and sock.gettimeout() != 0:
421 raise ValueError("the socket must be non-blocking")
422 try:
423 return sock.recv_into(buf)
424 except (BlockingIOError, InterruptedError):
425 pass
426 fut = self.create_future()
427 fd = sock.fileno()
428 self._ensure_fd_no_transport(fd)
429 handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
430 fut.add_done_callback(
431 functools.partial(self._sock_read_done, fd, handle=handle))
432 return await fut
434 def _sock_recv_into(self, fut, sock, buf):
435 # _sock_recv_into() can add itself as an I/O callback if the operation
436 # can't be done immediately. Don't use it directly, call
437 # sock_recv_into().
438 if fut.done(): 438 ↛ 439line 438 didn't jump to line 439 because the condition on line 438 was never true
439 return
440 try:
441 nbytes = sock.recv_into(buf)
442 except (BlockingIOError, InterruptedError):
443 return # try again next time
444 except (SystemExit, KeyboardInterrupt):
445 raise
446 except BaseException as exc:
447 fut.set_exception(exc)
448 else:
449 fut.set_result(nbytes)
451 async def sock_recvfrom(self, sock, bufsize):
452 """Receive a datagram from a datagram socket.
454 The return value is a tuple of (bytes, address) representing the
455 datagram received and the address it came from.
456 The maximum amount of data to be received at once is specified by
457 nbytes.
458 """
459 base_events._check_ssl_socket(sock)
460 if self._debug and sock.gettimeout() != 0: 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true
461 raise ValueError("the socket must be non-blocking")
462 try:
463 return sock.recvfrom(bufsize)
464 except (BlockingIOError, InterruptedError):
465 pass
466 fut = self.create_future()
467 fd = sock.fileno()
468 self._ensure_fd_no_transport(fd)
469 handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize)
470 fut.add_done_callback(
471 functools.partial(self._sock_read_done, fd, handle=handle))
472 return await fut
474 def _sock_recvfrom(self, fut, sock, bufsize):
475 # _sock_recvfrom() can add itself as an I/O callback if the operation
476 # can't be done immediately. Don't use it directly, call
477 # sock_recvfrom().
478 if fut.done(): 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true
479 return
480 try:
481 result = sock.recvfrom(bufsize)
482 except (BlockingIOError, InterruptedError):
483 return # try again next time
484 except (SystemExit, KeyboardInterrupt):
485 raise
486 except BaseException as exc:
487 fut.set_exception(exc)
488 else:
489 fut.set_result(result)
491 async def sock_recvfrom_into(self, sock, buf, nbytes=0):
492 """Receive data from the socket.
494 The received data is written into *buf* (a writable buffer).
495 The return value is a tuple of (number of bytes written, address).
496 """
497 base_events._check_ssl_socket(sock)
498 if self._debug and sock.gettimeout() != 0: 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true
499 raise ValueError("the socket must be non-blocking")
500 if not nbytes:
501 nbytes = len(buf)
503 try:
504 return sock.recvfrom_into(buf, nbytes)
505 except (BlockingIOError, InterruptedError):
506 pass
507 fut = self.create_future()
508 fd = sock.fileno()
509 self._ensure_fd_no_transport(fd)
510 handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf,
511 nbytes)
512 fut.add_done_callback(
513 functools.partial(self._sock_read_done, fd, handle=handle))
514 return await fut
516 def _sock_recvfrom_into(self, fut, sock, buf, bufsize):
517 # _sock_recv_into() can add itself as an I/O callback if the operation
518 # can't be done immediately. Don't use it directly, call
519 # sock_recv_into().
520 if fut.done(): 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true
521 return
522 try:
523 result = sock.recvfrom_into(buf, bufsize)
524 except (BlockingIOError, InterruptedError):
525 return # try again next time
526 except (SystemExit, KeyboardInterrupt):
527 raise
528 except BaseException as exc:
529 fut.set_exception(exc)
530 else:
531 fut.set_result(result)
533 async def sock_sendall(self, sock, data):
534 """Send data to the socket.
536 The socket must be connected to a remote socket. This method
537 continues to send data from data until either all data has been
538 sent or an error occurs. None is returned on success. On error,
539 an exception is raised, and there is no way to determine how much
540 data, if any, was successfully processed by the receiving end of
541 the connection.
542 """
543 base_events._check_ssl_socket(sock)
544 if self._debug and sock.gettimeout() != 0:
545 raise ValueError("the socket must be non-blocking")
546 try:
547 n = sock.send(data)
548 except (BlockingIOError, InterruptedError):
549 n = 0
551 if n == len(data):
552 # all data sent
553 return
555 fut = self.create_future()
556 fd = sock.fileno()
557 self._ensure_fd_no_transport(fd)
558 # use a trick with a list in closure to store a mutable state
559 handle = self._add_writer(fd, self._sock_sendall, fut, sock,
560 memoryview(data), [n])
561 fut.add_done_callback(
562 functools.partial(self._sock_write_done, fd, handle=handle))
563 return await fut
565 def _sock_sendall(self, fut, sock, view, pos):
566 if fut.done(): 566 ↛ 568line 566 didn't jump to line 568 because the condition on line 566 was never true
567 # Future cancellation can be scheduled on previous loop iteration
568 return
569 start = pos[0]
570 try:
571 n = sock.send(view[start:])
572 except (BlockingIOError, InterruptedError):
573 return
574 except (SystemExit, KeyboardInterrupt):
575 raise
576 except BaseException as exc:
577 fut.set_exception(exc)
578 return
580 start += n
582 if start == len(view):
583 fut.set_result(None)
584 else:
585 pos[0] = start
587 async def sock_sendto(self, sock, data, address):
588 """Send data to the socket.
590 The socket must be connected to a remote socket. This method
591 continues to send data from data until either all data has been
592 sent or an error occurs. None is returned on success. On error,
593 an exception is raised, and there is no way to determine how much
594 data, if any, was successfully processed by the receiving end of
595 the connection.
596 """
597 base_events._check_ssl_socket(sock)
598 if self._debug and sock.gettimeout() != 0: 598 ↛ 599line 598 didn't jump to line 599 because the condition on line 598 was never true
599 raise ValueError("the socket must be non-blocking")
600 try:
601 return sock.sendto(data, address)
602 except (BlockingIOError, InterruptedError):
603 pass
605 fut = self.create_future()
606 fd = sock.fileno()
607 self._ensure_fd_no_transport(fd)
608 # use a trick with a list in closure to store a mutable state
609 handle = self._add_writer(fd, self._sock_sendto, fut, sock, data,
610 address)
611 fut.add_done_callback(
612 functools.partial(self._sock_write_done, fd, handle=handle))
613 return await fut
615 def _sock_sendto(self, fut, sock, data, address):
616 if fut.done(): 616 ↛ 618line 616 didn't jump to line 618 because the condition on line 616 was never true
617 # Future cancellation can be scheduled on previous loop iteration
618 return
619 try:
620 n = sock.sendto(data, 0, address)
621 except (BlockingIOError, InterruptedError):
622 return
623 except (SystemExit, KeyboardInterrupt):
624 raise
625 except BaseException as exc:
626 fut.set_exception(exc)
627 else:
628 fut.set_result(n)
630 async def sock_connect(self, sock, address):
631 """Connect to a remote socket at address.
633 This method is a coroutine.
634 """
635 base_events._check_ssl_socket(sock)
636 if self._debug and sock.gettimeout() != 0:
637 raise ValueError("the socket must be non-blocking")
639 if sock.family == socket.AF_INET or (
640 base_events._HAS_IPv6 and sock.family == socket.AF_INET6):
641 resolved = await self._ensure_resolved(
642 address, family=sock.family, type=sock.type, proto=sock.proto,
643 loop=self,
644 )
645 _, _, _, _, address = resolved[0]
647 fut = self.create_future()
648 self._sock_connect(fut, sock, address)
649 try:
650 return await fut
651 finally:
652 # Needed to break cycles when an exception occurs.
653 fut = None
655 def _sock_connect(self, fut, sock, address):
656 fd = sock.fileno()
657 try:
658 sock.connect(address)
659 except (BlockingIOError, InterruptedError):
660 # Issue #23618: When the C function connect() fails with EINTR, the
661 # connection runs in background. We have to wait until the socket
662 # becomes writable to be notified when the connection succeed or
663 # fails.
664 self._ensure_fd_no_transport(fd)
665 handle = self._add_writer(
666 fd, self._sock_connect_cb, fut, sock, address)
667 fut.add_done_callback(
668 functools.partial(self._sock_write_done, fd, handle=handle))
669 except (SystemExit, KeyboardInterrupt):
670 raise
671 except BaseException as exc:
672 fut.set_exception(exc)
673 else:
674 fut.set_result(None)
675 finally:
676 fut = None
678 def _sock_write_done(self, fd, fut, handle=None):
679 if handle is None or not handle.cancelled():
680 self.remove_writer(fd)
682 def _sock_connect_cb(self, fut, sock, address):
683 if fut.done(): 683 ↛ 684line 683 didn't jump to line 684 because the condition on line 683 was never true
684 return
686 try:
687 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
688 if err != 0:
689 # Jump to any except clause below.
690 raise OSError(err, f'Connect call failed {address}')
691 except (BlockingIOError, InterruptedError):
692 # socket is still registered, the callback will be retried later
693 pass
694 except (SystemExit, KeyboardInterrupt):
695 raise
696 except BaseException as exc:
697 fut.set_exception(exc)
698 else:
699 fut.set_result(None)
700 finally:
701 fut = None
703 async def sock_accept(self, sock):
704 """Accept a connection.
706 The socket must be bound to an address and listening for
707 connections. The return value is a pair (conn, address) where
708 conn is a new socket object usable to send and receive data on the
709 connection, and address is the address bound to the socket on the
710 other end of the connection.
711 """
712 base_events._check_ssl_socket(sock)
713 if self._debug and sock.gettimeout() != 0:
714 raise ValueError("the socket must be non-blocking")
715 fut = self.create_future()
716 self._sock_accept(fut, sock)
717 return await fut
719 def _sock_accept(self, fut, sock):
720 fd = sock.fileno()
721 try:
722 conn, address = sock.accept()
723 conn.setblocking(False)
724 except (BlockingIOError, InterruptedError):
725 self._ensure_fd_no_transport(fd)
726 handle = self._add_reader(fd, self._sock_accept, fut, sock)
727 fut.add_done_callback(
728 functools.partial(self._sock_read_done, fd, handle=handle))
729 except (SystemExit, KeyboardInterrupt):
730 raise
731 except BaseException as exc:
732 fut.set_exception(exc)
733 else:
734 fut.set_result((conn, address))
736 async def _sendfile_native(self, transp, file, offset, count):
737 del self._transports[transp._sock_fd]
738 resume_reading = transp.is_reading()
739 transp.pause_reading()
740 await transp._make_empty_waiter()
741 try:
742 return await self.sock_sendfile(transp._sock, file, offset, count,
743 fallback=False)
744 finally:
745 transp._reset_empty_waiter()
746 if resume_reading: 746 ↛ 748line 746 didn't jump to line 748 because the condition on line 746 was always true
747 transp.resume_reading()
748 self._transports[transp._sock_fd] = transp
750 def _process_events(self, event_list):
751 for key, mask in event_list:
752 fileobj, (reader, writer) = key.fileobj, key.data
753 if mask & selectors.EVENT_READ and reader is not None:
754 if reader._cancelled:
755 self._remove_reader(fileobj)
756 else:
757 self._add_callback(reader)
758 if mask & selectors.EVENT_WRITE and writer is not None:
759 if writer._cancelled:
760 self._remove_writer(fileobj)
761 else:
762 self._add_callback(writer)
764 def _stop_serving(self, sock):
765 self._remove_reader(sock.fileno())
766 sock.close()
769class _SelectorTransport(transports._FlowControlMixin,
770 transports.Transport):
772 max_size = 256 * 1024 # Buffer size passed to recv().
774 # Attribute used in the destructor: it must be set even if the constructor
775 # is not called (see _SelectorSslTransport which may start by raising an
776 # exception)
777 _sock = None
779 def __init__(self, loop, sock, protocol, extra=None, server=None, context=None):
780 super().__init__(extra, loop)
781 self._extra['socket'] = trsock.TransportSocket(sock)
782 try:
783 self._extra['sockname'] = sock.getsockname()
784 except OSError:
785 self._extra['sockname'] = None
786 if 'peername' not in self._extra:
787 try:
788 self._extra['peername'] = sock.getpeername()
789 except socket.error:
790 self._extra['peername'] = None
791 self._sock = sock
792 self._sock_fd = sock.fileno()
793 self._context = context
794 self._protocol_connected = False
795 self.set_protocol(protocol)
797 self._server = server
798 self._buffer = collections.deque()
799 self._buffer_size = 0
800 self._conn_lost = 0 # Set when call to connection_lost scheduled.
801 self._closing = False # Set when close() called.
802 self._paused = False # Set when pause_reading() called
804 if self._server is not None:
805 self._server._attach(self)
806 loop._transports[self._sock_fd] = self
808 def __repr__(self):
809 info = [self.__class__.__name__]
810 if self._sock is None: 810 ↛ 811line 810 didn't jump to line 811 because the condition on line 810 was never true
811 info.append('closed')
812 elif self._closing:
813 info.append('closing')
814 info.append(f'fd={self._sock_fd}')
815 # test if the transport was closed
816 if self._loop is not None and not self._loop.is_closed():
817 polling = _test_selector_event(self._loop._selector,
818 self._sock_fd, selectors.EVENT_READ)
819 if polling:
820 info.append('read=polling')
821 else:
822 info.append('read=idle')
824 polling = _test_selector_event(self._loop._selector,
825 self._sock_fd,
826 selectors.EVENT_WRITE)
827 if polling: 827 ↛ 828line 827 didn't jump to line 828 because the condition on line 827 was never true
828 state = 'polling'
829 else:
830 state = 'idle'
832 bufsize = self.get_write_buffer_size()
833 info.append(f'write=<{state}, bufsize={bufsize}>')
834 return '<{}>'.format(' '.join(info))
836 def abort(self):
837 self._force_close(None)
839 def set_protocol(self, protocol):
840 self._protocol = protocol
841 self._protocol_connected = True
843 def get_protocol(self):
844 return self._protocol
846 def is_closing(self):
847 return self._closing
849 def is_reading(self):
850 return not self.is_closing() and not self._paused
852 def pause_reading(self):
853 if not self.is_reading():
854 return
855 self._paused = True
856 self._loop._remove_reader(self._sock_fd)
857 if self._loop.get_debug():
858 logger.debug("%r pauses reading", self)
860 def resume_reading(self):
861 if self._closing or not self._paused:
862 return
863 self._paused = False
864 self._add_reader(self._sock_fd, self._read_ready)
865 if self._loop.get_debug(): 865 ↛ 866line 865 didn't jump to line 866 because the condition on line 865 was never true
866 logger.debug("%r resumes reading", self)
868 def close(self):
869 if self._closing:
870 return
871 self._closing = True
872 self._loop._remove_reader(self._sock_fd)
873 if not self._buffer:
874 self._conn_lost += 1
875 self._loop._remove_writer(self._sock_fd)
876 self._call_soon(self._call_connection_lost, None)
878 def __del__(self, _warn=warnings.warn):
879 if self._sock is not None: 879 ↛ 880line 879 didn't jump to line 880 because the condition on line 879 was never true
880 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
881 self._sock.close()
882 if self._server is not None:
883 self._server._detach(self)
885 def _fatal_error(self, exc, message='Fatal error on transport'):
886 # Should be called from exception handler only.
887 if isinstance(exc, OSError):
888 if self._loop.get_debug(): 888 ↛ 889line 888 didn't jump to line 889 because the condition on line 888 was never true
889 logger.debug("%r: %s", self, message, exc_info=True)
890 else:
891 self._loop.call_exception_handler({
892 'message': message,
893 'exception': exc,
894 'transport': self,
895 'protocol': self._protocol,
896 })
897 self._force_close(exc)
899 def _force_close(self, exc):
900 if self._conn_lost:
901 return
902 if self._buffer:
903 self._buffer.clear()
904 self._buffer_size = 0
905 self._loop._remove_writer(self._sock_fd)
906 if not self._closing:
907 self._closing = True
908 self._loop._remove_reader(self._sock_fd)
909 self._conn_lost += 1
910 self._call_soon(self._call_connection_lost, exc)
912 def _call_connection_lost(self, exc):
913 try:
914 if self._protocol_connected: 914 ↛ 917line 914 didn't jump to line 917 because the condition on line 914 was always true
915 self._protocol.connection_lost(exc)
916 finally:
917 self._sock.close()
918 self._sock = None
919 self._protocol = None
920 self._loop = None
921 server = self._server
922 if server is not None:
923 server._detach(self)
924 self._server = None
926 def get_write_buffer_size(self):
927 return self._buffer_size
929 def _add_reader(self, fd, callback, *args):
930 if not self.is_reading():
931 return
932 self._loop._add_reader(fd, callback, *args, context=self._context)
934 def _add_writer(self, fd, callback, *args):
935 self._loop._add_writer(fd, callback, *args, context=self._context)
937 def _call_soon(self, callback, *args):
938 self._loop.call_soon(callback, *args, context=self._context)
940class _SelectorSocketTransport(_SelectorTransport):
942 _start_tls_compatible = True
943 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
945 def __init__(self, loop, sock, protocol, waiter=None,
946 extra=None, server=None, context=None):
947 self._read_ready_cb = None
948 super().__init__(loop, sock, protocol, extra, server, context)
949 self._eof = False
950 self._empty_waiter = None
951 if _HAS_SENDMSG: 951 ↛ 954line 951 didn't jump to line 954 because the condition on line 951 was always true
952 self._write_ready = self._write_sendmsg
953 else:
954 self._write_ready = self._write_send
955 # Disable the Nagle algorithm -- small writes will be
956 # sent without waiting for the TCP ACK. This generally
957 # decreases the latency (in some cases significantly.)
958 base_events._set_nodelay(self._sock)
960 self._call_soon(self._protocol.connection_made, self)
961 # only start reading when connection_made() has been called
962 self._call_soon(self._add_reader, self._sock_fd, self._read_ready)
963 if waiter is not None:
964 # only wake up the waiter when connection_made() has been called
965 self._call_soon(futures._set_result_unless_cancelled, waiter, None)
967 def set_protocol(self, protocol):
968 if isinstance(protocol, protocols.BufferedProtocol):
969 self._read_ready_cb = self._read_ready__get_buffer
970 else:
971 self._read_ready_cb = self._read_ready__data_received
973 super().set_protocol(protocol)
975 def _read_ready(self):
976 self._read_ready_cb()
978 def _read_ready__get_buffer(self):
979 if self._conn_lost: 979 ↛ 980line 979 didn't jump to line 980 because the condition on line 979 was never true
980 return
982 try:
983 buf = self._protocol.get_buffer(-1)
984 if not len(buf):
985 raise RuntimeError('get_buffer() returned an empty buffer')
986 except (SystemExit, KeyboardInterrupt):
987 raise
988 except BaseException as exc:
989 self._fatal_error(
990 exc, 'Fatal error: protocol.get_buffer() call failed.')
991 return
993 try:
994 nbytes = self._sock.recv_into(buf)
995 except (BlockingIOError, InterruptedError):
996 return
997 except (SystemExit, KeyboardInterrupt):
998 raise
999 except BaseException as exc:
1000 self._fatal_error(exc, 'Fatal read error on socket transport')
1001 return
1003 if not nbytes:
1004 self._read_ready__on_eof()
1005 return
1007 try:
1008 self._protocol.buffer_updated(nbytes)
1009 except (SystemExit, KeyboardInterrupt):
1010 raise
1011 except BaseException as exc:
1012 self._fatal_error(
1013 exc, 'Fatal error: protocol.buffer_updated() call failed.')
1015 def _read_ready__data_received(self):
1016 if self._conn_lost: 1016 ↛ 1017line 1016 didn't jump to line 1017 because the condition on line 1016 was never true
1017 return
1018 try:
1019 data = self._sock.recv(self.max_size)
1020 except (BlockingIOError, InterruptedError):
1021 return
1022 except (SystemExit, KeyboardInterrupt):
1023 raise
1024 except BaseException as exc:
1025 self._fatal_error(exc, 'Fatal read error on socket transport')
1026 return
1028 if not data:
1029 self._read_ready__on_eof()
1030 return
1032 try:
1033 self._protocol.data_received(data)
1034 except (SystemExit, KeyboardInterrupt):
1035 raise
1036 except BaseException as exc:
1037 self._fatal_error(
1038 exc, 'Fatal error: protocol.data_received() call failed.')
1040 def _read_ready__on_eof(self):
1041 if self._loop.get_debug():
1042 logger.debug("%r received EOF", self)
1044 try:
1045 keep_open = self._protocol.eof_received()
1046 except (SystemExit, KeyboardInterrupt):
1047 raise
1048 except BaseException as exc:
1049 self._fatal_error(
1050 exc, 'Fatal error: protocol.eof_received() call failed.')
1051 return
1053 if keep_open:
1054 # We're keeping the connection open so the
1055 # protocol can write more, but we still can't
1056 # receive more, so remove the reader callback.
1057 self._loop._remove_reader(self._sock_fd)
1058 else:
1059 self.close()
1061 def write(self, data):
1062 if not isinstance(data, (bytes, bytearray, memoryview)):
1063 raise TypeError(f'data argument must be a bytes, bytearray, or memoryview '
1064 f'object, not {type(data).__name__!r}')
1065 if self._eof: 1065 ↛ 1066line 1065 didn't jump to line 1066 because the condition on line 1065 was never true
1066 raise RuntimeError('Cannot call write() after write_eof()')
1067 if self._empty_waiter is not None:
1068 raise RuntimeError('unable to write; sendfile is in progress')
1069 if not data:
1070 return
1072 if self._conn_lost:
1073 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1074 logger.warning('socket.send() raised exception.')
1075 self._conn_lost += 1
1076 return
1078 if not self._buffer:
1079 # Optimization: try to send now.
1080 try:
1081 n = self._sock.send(data)
1082 except (BlockingIOError, InterruptedError):
1083 pass
1084 except (SystemExit, KeyboardInterrupt):
1085 raise
1086 except BaseException as exc:
1087 self._fatal_error(exc, 'Fatal write error on socket transport')
1088 return
1089 else:
1090 data = memoryview(data)[n:]
1091 if not data:
1092 return
1093 # Not all was written; register write handler.
1094 self._add_writer(self._sock_fd, self._write_ready)
1096 # Add it to the buffer.
1097 self._buffer.append(data)
1098 self._buffer_size += len(data)
1099 self._maybe_pause_protocol()
1101 def _get_sendmsg_buffer(self):
1102 return itertools.islice(self._buffer, SC_IOV_MAX)
1104 def _write_sendmsg(self):
1105 assert self._buffer, 'Data should not be empty'
1106 if self._conn_lost: 1106 ↛ 1107line 1106 didn't jump to line 1107 because the condition on line 1106 was never true
1107 return
1108 try:
1109 nbytes = self._sock.sendmsg(self._get_sendmsg_buffer())
1110 self._adjust_leftover_buffer(nbytes)
1111 except (BlockingIOError, InterruptedError):
1112 pass
1113 except (SystemExit, KeyboardInterrupt):
1114 raise
1115 except BaseException as exc:
1116 self._loop._remove_writer(self._sock_fd)
1117 self._buffer.clear()
1118 self._buffer_size = 0
1119 self._fatal_error(exc, 'Fatal write error on socket transport')
1120 if self._empty_waiter is not None: 1120 ↛ 1121line 1120 didn't jump to line 1121 because the condition on line 1120 was never true
1121 self._empty_waiter.set_exception(exc)
1122 else:
1123 self._maybe_resume_protocol() # May append to buffer.
1124 if not self._buffer:
1125 self._loop._remove_writer(self._sock_fd)
1126 if self._empty_waiter is not None:
1127 self._empty_waiter.set_result(None)
1128 if self._closing:
1129 self._call_connection_lost(None)
1130 elif self._eof: 1130 ↛ 1131line 1130 didn't jump to line 1131 because the condition on line 1130 was never true
1131 self._sock.shutdown(socket.SHUT_WR)
1133 def _adjust_leftover_buffer(self, nbytes: int) -> None:
1134 self._buffer_size -= nbytes
1135 buffer = self._buffer
1136 while nbytes:
1137 b = buffer.popleft()
1138 b_len = len(b)
1139 if b_len <= nbytes:
1140 nbytes -= b_len
1141 else:
1142 buffer.appendleft(b[nbytes:])
1143 break
1145 def _write_send(self):
1146 assert self._buffer, 'Data should not be empty'
1147 if self._conn_lost: 1147 ↛ 1148line 1147 didn't jump to line 1148 because the condition on line 1147 was never true
1148 return
1149 try:
1150 buffer = self._buffer.popleft()
1151 n = self._sock.send(buffer)
1152 if n != len(buffer):
1153 # Not all data was written
1154 self._buffer.appendleft(buffer[n:])
1155 self._buffer_size -= n
1156 except (BlockingIOError, InterruptedError):
1157 self._buffer.appendleft(buffer)
1158 return
1159 except (SystemExit, KeyboardInterrupt):
1160 raise
1161 except BaseException as exc:
1162 self._loop._remove_writer(self._sock_fd)
1163 self._buffer.clear()
1164 self._buffer_size = 0
1165 self._fatal_error(exc, 'Fatal write error on socket transport')
1166 if self._empty_waiter is not None: 1166 ↛ 1167line 1166 didn't jump to line 1167 because the condition on line 1166 was never true
1167 self._empty_waiter.set_exception(exc)
1168 else:
1169 self._maybe_resume_protocol() # May append to buffer.
1170 if not self._buffer:
1171 self._loop._remove_writer(self._sock_fd)
1172 if self._empty_waiter is not None: 1172 ↛ 1173line 1172 didn't jump to line 1173 because the condition on line 1172 was never true
1173 self._empty_waiter.set_result(None)
1174 if self._closing:
1175 self._call_connection_lost(None)
1176 elif self._eof:
1177 self._sock.shutdown(socket.SHUT_WR)
1179 def write_eof(self):
1180 if self._closing or self._eof:
1181 return
1182 self._eof = True
1183 if not self._buffer:
1184 self._sock.shutdown(socket.SHUT_WR)
1186 def writelines(self, list_of_data):
1187 if self._eof: 1187 ↛ 1188line 1187 didn't jump to line 1188 because the condition on line 1187 was never true
1188 raise RuntimeError('Cannot call writelines() after write_eof()')
1189 if self._empty_waiter is not None: 1189 ↛ 1190line 1189 didn't jump to line 1190 because the condition on line 1189 was never true
1190 raise RuntimeError('unable to writelines; sendfile is in progress')
1191 if not list_of_data: 1191 ↛ 1192line 1191 didn't jump to line 1192 because the condition on line 1191 was never true
1192 return
1194 if self._conn_lost:
1195 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 1195 ↛ 1196line 1195 didn't jump to line 1196 because the condition on line 1195 was never true
1196 logger.warning('socket.send() raised exception.')
1197 self._conn_lost += 1
1198 return
1200 for data in list_of_data:
1201 self._buffer.append(memoryview(data))
1202 self._buffer_size += len(data)
1203 self._write_ready()
1204 # If the entire buffer couldn't be written, register a write handler
1205 if self._buffer:
1206 self._add_writer(self._sock_fd, self._write_ready)
1207 self._maybe_pause_protocol()
1209 def can_write_eof(self):
1210 return True
1212 def _call_connection_lost(self, exc):
1213 try:
1214 super()._call_connection_lost(exc)
1215 finally:
1216 self._write_ready = None
1217 if self._empty_waiter is not None: 1217 ↛ 1218line 1217 didn't jump to line 1218 because the condition on line 1217 was never true
1218 self._empty_waiter.set_exception(
1219 ConnectionError("Connection is closed by peer"))
1221 def _make_empty_waiter(self):
1222 if self._empty_waiter is not None: 1222 ↛ 1223line 1222 didn't jump to line 1223 because the condition on line 1222 was never true
1223 raise RuntimeError("Empty waiter is already set")
1224 self._empty_waiter = self._loop.create_future()
1225 if not self._buffer:
1226 self._empty_waiter.set_result(None)
1227 return self._empty_waiter
1229 def _reset_empty_waiter(self):
1230 self._empty_waiter = None
1232 def close(self):
1233 self._read_ready_cb = None
1234 super().close()
1237class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport):
1239 _header_size = 8
1241 def __init__(self, loop, sock, protocol, address=None,
1242 waiter=None, extra=None):
1243 super().__init__(loop, sock, protocol, extra)
1244 self._address = address
1245 self._buffer_size = 0
1246 self._call_soon(self._protocol.connection_made, self)
1247 # only start reading when connection_made() has been called
1248 self._call_soon(self._add_reader, self._sock_fd, self._read_ready)
1249 if waiter is not None:
1250 # only wake up the waiter when connection_made() has been called
1251 self._call_soon(futures._set_result_unless_cancelled, waiter, None)
1253 def get_write_buffer_size(self):
1254 return self._buffer_size
1256 def _read_ready(self):
1257 if self._conn_lost: 1257 ↛ 1258line 1257 didn't jump to line 1258 because the condition on line 1257 was never true
1258 return
1259 try:
1260 data, addr = self._sock.recvfrom(self.max_size)
1261 except (BlockingIOError, InterruptedError):
1262 pass
1263 except OSError as exc:
1264 self._protocol.error_received(exc)
1265 except (SystemExit, KeyboardInterrupt):
1266 raise
1267 except BaseException as exc:
1268 self._fatal_error(exc, 'Fatal read error on datagram transport')
1269 else:
1270 self._protocol.datagram_received(data, addr)
1272 def sendto(self, data, addr=None):
1273 if not isinstance(data, (bytes, bytearray, memoryview)):
1274 raise TypeError(f'data argument must be a bytes-like object, '
1275 f'not {type(data).__name__!r}')
1277 if self._address:
1278 if addr not in (None, self._address):
1279 raise ValueError(
1280 f'Invalid address: must be None or {self._address}')
1281 addr = self._address
1283 if self._conn_lost and self._address:
1284 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1285 logger.warning('socket.send() raised exception.')
1286 self._conn_lost += 1
1287 return
1289 if not self._buffer:
1290 # Attempt to send it right away first.
1291 try:
1292 if self._extra['peername']:
1293 self._sock.send(data)
1294 else:
1295 self._sock.sendto(data, addr)
1296 return
1297 except (BlockingIOError, InterruptedError):
1298 self._add_writer(self._sock_fd, self._sendto_ready)
1299 except OSError as exc:
1300 self._protocol.error_received(exc)
1301 return
1302 except (SystemExit, KeyboardInterrupt):
1303 raise
1304 except BaseException as exc:
1305 self._fatal_error(
1306 exc, 'Fatal write error on datagram transport')
1307 return
1309 # Ensure that what we buffer is immutable.
1310 self._buffer.append((bytes(data), addr))
1311 self._buffer_size += len(data) + self._header_size
1312 self._maybe_pause_protocol()
1314 def _sendto_ready(self):
1315 while self._buffer:
1316 data, addr = self._buffer.popleft()
1317 self._buffer_size -= len(data) + self._header_size
1318 try:
1319 if self._extra['peername']:
1320 self._sock.send(data)
1321 else:
1322 self._sock.sendto(data, addr)
1323 except (BlockingIOError, InterruptedError):
1324 self._buffer.appendleft((data, addr)) # Try again later.
1325 self._buffer_size += len(data) + self._header_size
1326 break
1327 except OSError as exc:
1328 self._protocol.error_received(exc)
1329 return
1330 except (SystemExit, KeyboardInterrupt):
1331 raise
1332 except BaseException as exc:
1333 self._fatal_error(
1334 exc, 'Fatal write error on datagram transport')
1335 return
1337 self._maybe_resume_protocol() # May append to buffer.
1338 if not self._buffer:
1339 self._loop._remove_writer(self._sock_fd)
1340 if self._closing:
1341 self._call_connection_lost(None)