Coverage for Lib / asyncio / selector_events.py: 87%
922 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 02:39 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 02:39 +0000
1"""Event loop using a selector and related classes.
3A selector is a "notify-when-ready" multiplexer. For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
7__all__ = 'BaseSelectorEventLoop',
9import collections
10import errno
11import functools
12import itertools
13import os
14import selectors
15import socket
16import warnings
17import weakref
18try:
19 import ssl
20except ImportError: # pragma: no cover
21 ssl = None
23from . import base_events
24from . import constants
25from . import events
26from . import futures
27from . import protocols
28from . import sslproto
29from . import transports
30from . import trsock
31from .log import logger
33_HAS_SENDMSG = hasattr(socket.socket, 'sendmsg')
35if _HAS_SENDMSG: 35 ↛ 42line 35 didn't jump to line 42 because the condition on line 35 was always true
36 try:
37 SC_IOV_MAX = os.sysconf('SC_IOV_MAX')
38 except OSError:
39 # Fallback to send
40 _HAS_SENDMSG = False
42def _test_selector_event(selector, fd, event):
43 # Test if the selector is monitoring 'event' events
44 # for the file descriptor 'fd'.
45 try:
46 key = selector.get_key(fd)
47 except KeyError:
48 return False
49 else:
50 return bool(key.events & event)
53class BaseSelectorEventLoop(base_events.BaseEventLoop):
54 """Selector event loop.
56 See events.EventLoop for API specification.
57 """
59 def __init__(self, selector=None):
60 super().__init__()
62 if selector is None:
63 selector = selectors.DefaultSelector()
64 logger.debug('Using selector: %s', selector.__class__.__name__)
65 self._selector = selector
66 self._make_self_pipe()
67 self._transports = weakref.WeakValueDictionary()
69 def _make_socket_transport(self, sock, protocol, waiter=None, *,
70 extra=None, server=None, context=None):
71 self._ensure_fd_no_transport(sock)
72 return _SelectorSocketTransport(self, sock, protocol, waiter,
73 extra, server, context=context)
75 def _make_ssl_transport(
76 self, rawsock, protocol, sslcontext, waiter=None,
77 *, server_side=False, server_hostname=None,
78 extra=None, server=None,
79 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
80 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT,
81 context=None,
82 ):
83 self._ensure_fd_no_transport(rawsock)
84 ssl_protocol = sslproto.SSLProtocol(
85 self, protocol, sslcontext, waiter,
86 server_side, server_hostname,
87 ssl_handshake_timeout=ssl_handshake_timeout,
88 ssl_shutdown_timeout=ssl_shutdown_timeout,
89 )
90 _SelectorSocketTransport(self, rawsock, ssl_protocol,
91 extra=extra, server=server, context=context)
92 return ssl_protocol._app_transport
94 def _make_datagram_transport(self, sock, protocol,
95 address=None, waiter=None, extra=None):
96 self._ensure_fd_no_transport(sock)
97 return _SelectorDatagramTransport(self, sock, protocol,
98 address, waiter, extra)
100 def close(self):
101 if self.is_running():
102 raise RuntimeError("Cannot close a running event loop")
103 if self.is_closed():
104 return
105 self._close_self_pipe()
106 super().close()
107 if self._selector is not None:
108 self._selector.close()
109 self._selector = None
111 def _close_self_pipe(self):
112 self._remove_reader(self._ssock.fileno())
113 self._ssock.close()
114 self._ssock = None
115 self._csock.close()
116 self._csock = None
117 self._internal_fds -= 1
119 def _make_self_pipe(self):
120 # A self-socket, really. :-)
121 self._ssock, self._csock = socket.socketpair()
122 self._ssock.setblocking(False)
123 self._csock.setblocking(False)
124 self._internal_fds += 1
125 self._add_reader(self._ssock.fileno(), self._read_from_self)
127 def _process_self_data(self, data):
128 pass
130 def _read_from_self(self):
131 while True:
132 try:
133 data = self._ssock.recv(4096)
134 if not data: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 break
136 self._process_self_data(data)
137 except InterruptedError:
138 continue
139 except BlockingIOError:
140 break
142 def _write_to_self(self):
143 # This may be called from a different thread, possibly after
144 # _close_self_pipe() has been called or even while it is
145 # running. Guard for self._csock being None or closed. When
146 # a socket is closed, send() raises OSError (with errno set to
147 # EBADF, but let's not rely on the exact error code).
148 csock = self._csock
149 if csock is None: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true
150 return
152 try:
153 csock.send(b'\0')
154 except OSError:
155 if self._debug: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 logger.debug("Fail to write a null byte into the "
157 "self-pipe socket",
158 exc_info=True)
160 def _start_serving(self, protocol_factory, sock,
161 sslcontext=None, server=None, backlog=100,
162 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
163 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT, context=None):
164 self._add_reader(sock.fileno(), self._accept_connection,
165 protocol_factory, sock, sslcontext, server, backlog,
166 ssl_handshake_timeout, ssl_shutdown_timeout, context)
168 def _accept_connection(
169 self, protocol_factory, sock,
170 sslcontext=None, server=None, backlog=100,
171 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
172 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT, context=None):
173 # This method is only called once for each event loop tick where the
174 # listening socket has triggered an EVENT_READ. There may be multiple
175 # connections waiting for an .accept() so it is called in a loop.
176 # See https://bugs.python.org/issue27906 for more details.
177 for _ in range(backlog + 1):
178 try:
179 conn, addr = sock.accept()
180 if self._debug:
181 logger.debug("%r got a new connection from %r: %r",
182 server, addr, conn)
183 conn.setblocking(False)
184 except ConnectionAbortedError:
185 # Discard connections that were aborted before accept().
186 continue
187 except (BlockingIOError, InterruptedError):
188 # Early exit because of a signal or
189 # the socket accept buffer is empty.
190 return
191 except OSError as exc:
192 # There's nowhere to send the error, so just log it.
193 if exc.errno in (errno.EMFILE, errno.ENFILE, 193 ↛ 210line 193 didn't jump to line 210 because the condition on line 193 was always true
194 errno.ENOBUFS, errno.ENOMEM):
195 # Some platforms (e.g. Linux keep reporting the FD as
196 # ready, so we remove the read handler temporarily.
197 # We'll try again in a while.
198 self.call_exception_handler({
199 'message': 'socket.accept() out of system resource',
200 'exception': exc,
201 'socket': trsock.TransportSocket(sock),
202 })
203 self._remove_reader(sock.fileno())
204 self.call_later(constants.ACCEPT_RETRY_DELAY,
205 self._start_serving,
206 protocol_factory, sock, sslcontext, server,
207 backlog, ssl_handshake_timeout,
208 ssl_shutdown_timeout, context)
209 else:
210 raise # The event loop will catch, log and ignore it.
211 else:
212 extra = {'peername': addr}
213 conn_context = context.copy() if context is not None else None
214 accept = self._accept_connection2(
215 protocol_factory, conn, extra, sslcontext, server,
216 ssl_handshake_timeout, ssl_shutdown_timeout, context=conn_context)
217 self.create_task(accept, context=conn_context)
219 async def _accept_connection2(
220 self, protocol_factory, conn, extra,
221 sslcontext=None, server=None,
222 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
223 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT, context=None):
224 protocol = None
225 transport = None
226 try:
227 protocol = protocol_factory()
228 waiter = self.create_future()
229 if sslcontext:
230 transport = self._make_ssl_transport(
231 conn, protocol, sslcontext, waiter=waiter,
232 server_side=True, extra=extra, server=server,
233 ssl_handshake_timeout=ssl_handshake_timeout,
234 ssl_shutdown_timeout=ssl_shutdown_timeout,
235 context=context)
236 else:
237 transport = self._make_socket_transport(
238 conn, protocol, waiter=waiter, extra=extra,
239 server=server, context=context)
241 try:
242 await waiter
243 except BaseException:
244 transport.close()
245 # gh-109534: When an exception is raised by the SSLProtocol object the
246 # exception set in this future can keep the protocol object alive and
247 # cause a reference cycle.
248 waiter = None
249 raise
250 # It's now up to the protocol to handle the connection.
252 except (SystemExit, KeyboardInterrupt):
253 raise
254 except BaseException as exc:
255 if self._debug: 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true
256 context = {
257 'message':
258 'Error on transport creation for incoming connection',
259 'exception': exc,
260 }
261 if protocol is not None:
262 context['protocol'] = protocol
263 if transport is not None:
264 context['transport'] = transport
265 self.call_exception_handler(context)
267 def _ensure_fd_no_transport(self, fd):
268 fileno = fd
269 if not isinstance(fileno, int):
270 try:
271 fileno = int(fileno.fileno())
272 except (AttributeError, TypeError, ValueError):
273 # This code matches selectors._fileobj_to_fd function.
274 raise ValueError(f"Invalid file object: {fd!r}") from None
275 transport = self._transports.get(fileno)
276 if transport and not transport.is_closing():
277 raise RuntimeError(
278 f'File descriptor {fd!r} is used by transport '
279 f'{transport!r}')
281 def _add_reader(self, fd, callback, *args, context=None):
282 self._check_closed()
283 handle = events.Handle(callback, args, self, context=context)
284 key = self._selector.get_map().get(fd)
285 if key is None:
286 self._selector.register(fd, selectors.EVENT_READ,
287 (handle, None))
288 else:
289 mask, (reader, writer) = key.events, key.data
290 self._selector.modify(fd, mask | selectors.EVENT_READ,
291 (handle, writer))
292 if reader is not None:
293 reader.cancel()
294 return handle
296 def _remove_reader(self, fd):
297 if self.is_closed():
298 return False
299 key = self._selector.get_map().get(fd)
300 if key is None:
301 return False
302 mask, (reader, writer) = key.events, key.data
303 mask &= ~selectors.EVENT_READ
304 if not mask:
305 self._selector.unregister(fd)
306 else:
307 self._selector.modify(fd, mask, (None, writer))
309 if reader is not None:
310 reader.cancel()
311 return True
312 else:
313 return False
315 def _add_writer(self, fd, callback, *args, context=None):
316 self._check_closed()
317 handle = events.Handle(callback, args, self, context=context)
318 key = self._selector.get_map().get(fd)
319 if key is None:
320 self._selector.register(fd, selectors.EVENT_WRITE,
321 (None, handle))
322 else:
323 mask, (reader, writer) = key.events, key.data
324 self._selector.modify(fd, mask | selectors.EVENT_WRITE,
325 (reader, handle))
326 if writer is not None:
327 writer.cancel()
328 return handle
330 def _remove_writer(self, fd):
331 """Remove a writer callback."""
332 if self.is_closed():
333 return False
334 key = self._selector.get_map().get(fd)
335 if key is None:
336 return False
337 mask, (reader, writer) = key.events, key.data
338 # Remove both writer and connector.
339 mask &= ~selectors.EVENT_WRITE
340 if not mask:
341 self._selector.unregister(fd)
342 else:
343 self._selector.modify(fd, mask, (reader, None))
345 if writer is not None:
346 writer.cancel()
347 return True
348 else:
349 return False
351 def add_reader(self, fd, callback, *args):
352 """Add a reader callback."""
353 self._ensure_fd_no_transport(fd)
354 self._add_reader(fd, callback, *args)
356 def remove_reader(self, fd):
357 """Remove a reader callback."""
358 self._ensure_fd_no_transport(fd)
359 return self._remove_reader(fd)
361 def add_writer(self, fd, callback, *args):
362 """Add a writer callback.."""
363 self._ensure_fd_no_transport(fd)
364 self._add_writer(fd, callback, *args)
366 def remove_writer(self, fd):
367 """Remove a writer callback."""
368 self._ensure_fd_no_transport(fd)
369 return self._remove_writer(fd)
371 async def sock_recv(self, sock, n):
372 """Receive data from the socket.
374 The return value is a bytes object representing the data received.
375 The maximum amount of data to be received at once is specified by
376 nbytes.
377 """
378 base_events._check_ssl_socket(sock)
379 if self._debug and sock.gettimeout() != 0:
380 raise ValueError("the socket must be non-blocking")
381 try:
382 return sock.recv(n)
383 except (BlockingIOError, InterruptedError):
384 pass
385 fut = self.create_future()
386 fd = sock.fileno()
387 self._ensure_fd_no_transport(fd)
388 handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
389 fut.add_done_callback(
390 functools.partial(self._sock_read_done, fd, handle=handle))
391 return await fut
393 def _sock_read_done(self, fd, fut, handle=None):
394 if handle is None or not handle.cancelled():
395 self.remove_reader(fd)
397 def _sock_recv(self, fut, sock, n):
398 # _sock_recv() can add itself as an I/O callback if the operation can't
399 # be done immediately. Don't use it directly, call sock_recv().
400 if fut.done(): 400 ↛ 401line 400 didn't jump to line 401 because the condition on line 400 was never true
401 return
402 try:
403 data = sock.recv(n)
404 except (BlockingIOError, InterruptedError):
405 return # try again next time
406 except (SystemExit, KeyboardInterrupt):
407 raise
408 except BaseException as exc:
409 fut.set_exception(exc)
410 else:
411 fut.set_result(data)
413 async def sock_recv_into(self, sock, buf):
414 """Receive data from the socket.
416 The received data is written into *buf* (a writable buffer).
417 The return value is the number of bytes written.
418 """
419 base_events._check_ssl_socket(sock)
420 if self._debug and sock.gettimeout() != 0:
421 raise ValueError("the socket must be non-blocking")
422 try:
423 return sock.recv_into(buf)
424 except (BlockingIOError, InterruptedError):
425 pass
426 fut = self.create_future()
427 fd = sock.fileno()
428 self._ensure_fd_no_transport(fd)
429 handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
430 fut.add_done_callback(
431 functools.partial(self._sock_read_done, fd, handle=handle))
432 return await fut
434 def _sock_recv_into(self, fut, sock, buf):
435 # _sock_recv_into() can add itself as an I/O callback if the operation
436 # can't be done immediately. Don't use it directly, call
437 # sock_recv_into().
438 if fut.done(): 438 ↛ 439line 438 didn't jump to line 439 because the condition on line 438 was never true
439 return
440 try:
441 nbytes = sock.recv_into(buf)
442 except (BlockingIOError, InterruptedError):
443 return # try again next time
444 except (SystemExit, KeyboardInterrupt):
445 raise
446 except BaseException as exc:
447 fut.set_exception(exc)
448 else:
449 fut.set_result(nbytes)
451 async def sock_recvfrom(self, sock, bufsize):
452 """Receive a datagram from a datagram socket.
454 The return value is a tuple of (bytes, address) representing the
455 datagram received and the address it came from.
456 The maximum amount of data to be received at once is specified by
457 nbytes.
458 """
459 base_events._check_ssl_socket(sock)
460 if self._debug and sock.gettimeout() != 0: 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true
461 raise ValueError("the socket must be non-blocking")
462 try:
463 return sock.recvfrom(bufsize)
464 except (BlockingIOError, InterruptedError):
465 pass
466 fut = self.create_future()
467 fd = sock.fileno()
468 self._ensure_fd_no_transport(fd)
469 handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize)
470 fut.add_done_callback(
471 functools.partial(self._sock_read_done, fd, handle=handle))
472 return await fut
474 def _sock_recvfrom(self, fut, sock, bufsize):
475 # _sock_recvfrom() can add itself as an I/O callback if the operation
476 # can't be done immediately. Don't use it directly, call
477 # sock_recvfrom().
478 if fut.done(): 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true
479 return
480 try:
481 result = sock.recvfrom(bufsize)
482 except (BlockingIOError, InterruptedError):
483 return # try again next time
484 except (SystemExit, KeyboardInterrupt):
485 raise
486 except BaseException as exc:
487 fut.set_exception(exc)
488 else:
489 fut.set_result(result)
491 async def sock_recvfrom_into(self, sock, buf, nbytes=0):
492 """Receive data from the socket.
494 The received data is written into *buf* (a writable buffer).
495 The return value is a tuple of (number of bytes written, address).
496 """
497 base_events._check_ssl_socket(sock)
498 if self._debug and sock.gettimeout() != 0: 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true
499 raise ValueError("the socket must be non-blocking")
500 if not nbytes:
501 nbytes = len(buf)
503 try:
504 return sock.recvfrom_into(buf, nbytes)
505 except (BlockingIOError, InterruptedError):
506 pass
507 fut = self.create_future()
508 fd = sock.fileno()
509 self._ensure_fd_no_transport(fd)
510 handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf,
511 nbytes)
512 fut.add_done_callback(
513 functools.partial(self._sock_read_done, fd, handle=handle))
514 return await fut
516 def _sock_recvfrom_into(self, fut, sock, buf, bufsize):
517 # _sock_recv_into() can add itself as an I/O callback if the operation
518 # can't be done immediately. Don't use it directly, call
519 # sock_recv_into().
520 if fut.done(): 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true
521 return
522 try:
523 result = sock.recvfrom_into(buf, bufsize)
524 except (BlockingIOError, InterruptedError):
525 return # try again next time
526 except (SystemExit, KeyboardInterrupt):
527 raise
528 except BaseException as exc:
529 fut.set_exception(exc)
530 else:
531 fut.set_result(result)
533 async def sock_sendall(self, sock, data):
534 """Send data to the socket.
536 The socket must be connected to a remote socket. This method continues
537 to send data from data until either all data has been sent or an
538 error occurs. None is returned on success. On error, an exception is
539 raised, and there is no way to determine how much data, if any, was
540 successfully processed by the receiving end of the connection.
541 """
542 base_events._check_ssl_socket(sock)
543 if self._debug and sock.gettimeout() != 0:
544 raise ValueError("the socket must be non-blocking")
545 try:
546 n = sock.send(data)
547 except (BlockingIOError, InterruptedError):
548 n = 0
550 if n == len(data):
551 # all data sent
552 return
554 fut = self.create_future()
555 fd = sock.fileno()
556 self._ensure_fd_no_transport(fd)
557 # use a trick with a list in closure to store a mutable state
558 handle = self._add_writer(fd, self._sock_sendall, fut, sock,
559 memoryview(data), [n])
560 fut.add_done_callback(
561 functools.partial(self._sock_write_done, fd, handle=handle))
562 return await fut
564 def _sock_sendall(self, fut, sock, view, pos):
565 if fut.done(): 565 ↛ 567line 565 didn't jump to line 567 because the condition on line 565 was never true
566 # Future cancellation can be scheduled on previous loop iteration
567 return
568 start = pos[0]
569 try:
570 n = sock.send(view[start:])
571 except (BlockingIOError, InterruptedError):
572 return
573 except (SystemExit, KeyboardInterrupt):
574 raise
575 except BaseException as exc:
576 fut.set_exception(exc)
577 return
579 start += n
581 if start == len(view):
582 fut.set_result(None)
583 else:
584 pos[0] = start
586 async def sock_sendto(self, sock, data, address):
587 """Send data to the socket.
589 The socket must be connected to a remote socket. This method continues
590 to send data from data until either all data has been sent or an
591 error occurs. None is returned on success. On error, an exception is
592 raised, and there is no way to determine how much data, if any, was
593 successfully processed by the receiving end of the connection.
594 """
595 base_events._check_ssl_socket(sock)
596 if self._debug and sock.gettimeout() != 0: 596 ↛ 597line 596 didn't jump to line 597 because the condition on line 596 was never true
597 raise ValueError("the socket must be non-blocking")
598 try:
599 return sock.sendto(data, address)
600 except (BlockingIOError, InterruptedError):
601 pass
603 fut = self.create_future()
604 fd = sock.fileno()
605 self._ensure_fd_no_transport(fd)
606 # use a trick with a list in closure to store a mutable state
607 handle = self._add_writer(fd, self._sock_sendto, fut, sock, data,
608 address)
609 fut.add_done_callback(
610 functools.partial(self._sock_write_done, fd, handle=handle))
611 return await fut
613 def _sock_sendto(self, fut, sock, data, address):
614 if fut.done(): 614 ↛ 616line 614 didn't jump to line 616 because the condition on line 614 was never true
615 # Future cancellation can be scheduled on previous loop iteration
616 return
617 try:
618 n = sock.sendto(data, 0, address)
619 except (BlockingIOError, InterruptedError):
620 return
621 except (SystemExit, KeyboardInterrupt):
622 raise
623 except BaseException as exc:
624 fut.set_exception(exc)
625 else:
626 fut.set_result(n)
628 async def sock_connect(self, sock, address):
629 """Connect to a remote socket at address.
631 This method is a coroutine.
632 """
633 base_events._check_ssl_socket(sock)
634 if self._debug and sock.gettimeout() != 0:
635 raise ValueError("the socket must be non-blocking")
637 if sock.family == socket.AF_INET or (
638 base_events._HAS_IPv6 and sock.family == socket.AF_INET6):
639 resolved = await self._ensure_resolved(
640 address, family=sock.family, type=sock.type, proto=sock.proto,
641 loop=self,
642 )
643 _, _, _, _, address = resolved[0]
645 fut = self.create_future()
646 self._sock_connect(fut, sock, address)
647 try:
648 return await fut
649 finally:
650 # Needed to break cycles when an exception occurs.
651 fut = None
653 def _sock_connect(self, fut, sock, address):
654 fd = sock.fileno()
655 try:
656 sock.connect(address)
657 except (BlockingIOError, InterruptedError):
658 # Issue #23618: When the C function connect() fails with EINTR, the
659 # connection runs in background. We have to wait until the socket
660 # becomes writable to be notified when the connection succeed or
661 # fails.
662 self._ensure_fd_no_transport(fd)
663 handle = self._add_writer(
664 fd, self._sock_connect_cb, fut, sock, address)
665 fut.add_done_callback(
666 functools.partial(self._sock_write_done, fd, handle=handle))
667 except (SystemExit, KeyboardInterrupt):
668 raise
669 except BaseException as exc:
670 fut.set_exception(exc)
671 else:
672 fut.set_result(None)
673 finally:
674 fut = None
676 def _sock_write_done(self, fd, fut, handle=None):
677 if handle is None or not handle.cancelled():
678 self.remove_writer(fd)
680 def _sock_connect_cb(self, fut, sock, address):
681 if fut.done(): 681 ↛ 682line 681 didn't jump to line 682 because the condition on line 681 was never true
682 return
684 try:
685 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
686 if err != 0:
687 # Jump to any except clause below.
688 raise OSError(err, f'Connect call failed {address}')
689 except (BlockingIOError, InterruptedError):
690 # socket is still registered, the callback will be retried later
691 pass
692 except (SystemExit, KeyboardInterrupt):
693 raise
694 except BaseException as exc:
695 fut.set_exception(exc)
696 else:
697 fut.set_result(None)
698 finally:
699 fut = None
701 async def sock_accept(self, sock):
702 """Accept a connection.
704 The socket must be bound to an address and listening for connections.
705 The return value is a pair (conn, address) where conn is a new socket
706 object usable to send and receive data on the connection, and address
707 is the address bound to the socket on the other end of the connection.
708 """
709 base_events._check_ssl_socket(sock)
710 if self._debug and sock.gettimeout() != 0:
711 raise ValueError("the socket must be non-blocking")
712 fut = self.create_future()
713 self._sock_accept(fut, sock)
714 return await fut
716 def _sock_accept(self, fut, sock):
717 fd = sock.fileno()
718 try:
719 conn, address = sock.accept()
720 conn.setblocking(False)
721 except (BlockingIOError, InterruptedError):
722 self._ensure_fd_no_transport(fd)
723 handle = self._add_reader(fd, self._sock_accept, fut, sock)
724 fut.add_done_callback(
725 functools.partial(self._sock_read_done, fd, handle=handle))
726 except (SystemExit, KeyboardInterrupt):
727 raise
728 except BaseException as exc:
729 fut.set_exception(exc)
730 else:
731 fut.set_result((conn, address))
733 async def _sendfile_native(self, transp, file, offset, count):
734 del self._transports[transp._sock_fd]
735 resume_reading = transp.is_reading()
736 transp.pause_reading()
737 await transp._make_empty_waiter()
738 try:
739 return await self.sock_sendfile(transp._sock, file, offset, count,
740 fallback=False)
741 finally:
742 transp._reset_empty_waiter()
743 if resume_reading: 743 ↛ 745line 743 didn't jump to line 745 because the condition on line 743 was always true
744 transp.resume_reading()
745 self._transports[transp._sock_fd] = transp
747 def _process_events(self, event_list):
748 for key, mask in event_list:
749 fileobj, (reader, writer) = key.fileobj, key.data
750 if mask & selectors.EVENT_READ and reader is not None:
751 if reader._cancelled:
752 self._remove_reader(fileobj)
753 else:
754 self._add_callback(reader)
755 if mask & selectors.EVENT_WRITE and writer is not None:
756 if writer._cancelled:
757 self._remove_writer(fileobj)
758 else:
759 self._add_callback(writer)
761 def _stop_serving(self, sock):
762 self._remove_reader(sock.fileno())
763 sock.close()
766class _SelectorTransport(transports._FlowControlMixin,
767 transports.Transport):
769 max_size = 256 * 1024 # Buffer size passed to recv().
771 # Attribute used in the destructor: it must be set even if the constructor
772 # is not called (see _SelectorSslTransport which may start by raising an
773 # exception)
774 _sock = None
776 def __init__(self, loop, sock, protocol, extra=None, server=None, context=None):
777 super().__init__(extra, loop)
778 self._extra['socket'] = trsock.TransportSocket(sock)
779 try:
780 self._extra['sockname'] = sock.getsockname()
781 except OSError:
782 self._extra['sockname'] = None
783 if 'peername' not in self._extra:
784 try:
785 self._extra['peername'] = sock.getpeername()
786 except socket.error:
787 self._extra['peername'] = None
788 self._sock = sock
789 self._sock_fd = sock.fileno()
790 self._context = context
791 self._protocol_connected = False
792 self.set_protocol(protocol)
794 self._server = server
795 self._buffer = collections.deque()
796 self._buffer_size = 0
797 self._conn_lost = 0 # Set when call to connection_lost scheduled.
798 self._closing = False # Set when close() called.
799 self._paused = False # Set when pause_reading() called
801 if self._server is not None:
802 self._server._attach(self)
803 loop._transports[self._sock_fd] = self
805 def __repr__(self):
806 info = [self.__class__.__name__]
807 if self._sock is None: 807 ↛ 808line 807 didn't jump to line 808 because the condition on line 807 was never true
808 info.append('closed')
809 elif self._closing:
810 info.append('closing')
811 info.append(f'fd={self._sock_fd}')
812 # test if the transport was closed
813 if self._loop is not None and not self._loop.is_closed():
814 polling = _test_selector_event(self._loop._selector,
815 self._sock_fd, selectors.EVENT_READ)
816 if polling:
817 info.append('read=polling')
818 else:
819 info.append('read=idle')
821 polling = _test_selector_event(self._loop._selector,
822 self._sock_fd,
823 selectors.EVENT_WRITE)
824 if polling: 824 ↛ 825line 824 didn't jump to line 825 because the condition on line 824 was never true
825 state = 'polling'
826 else:
827 state = 'idle'
829 bufsize = self.get_write_buffer_size()
830 info.append(f'write=<{state}, bufsize={bufsize}>')
831 return '<{}>'.format(' '.join(info))
833 def abort(self):
834 self._force_close(None)
836 def set_protocol(self, protocol):
837 self._protocol = protocol
838 self._protocol_connected = True
840 def get_protocol(self):
841 return self._protocol
843 def is_closing(self):
844 return self._closing
846 def is_reading(self):
847 return not self.is_closing() and not self._paused
849 def pause_reading(self):
850 if not self.is_reading():
851 return
852 self._paused = True
853 self._loop._remove_reader(self._sock_fd)
854 if self._loop.get_debug():
855 logger.debug("%r pauses reading", self)
857 def resume_reading(self):
858 if self._closing or not self._paused:
859 return
860 self._paused = False
861 self._add_reader(self._sock_fd, self._read_ready)
862 if self._loop.get_debug(): 862 ↛ 863line 862 didn't jump to line 863 because the condition on line 862 was never true
863 logger.debug("%r resumes reading", self)
865 def close(self):
866 if self._closing:
867 return
868 self._closing = True
869 self._loop._remove_reader(self._sock_fd)
870 if not self._buffer:
871 self._conn_lost += 1
872 self._loop._remove_writer(self._sock_fd)
873 self._call_soon(self._call_connection_lost, None)
875 def __del__(self, _warn=warnings.warn):
876 if self._sock is not None: 876 ↛ 877line 876 didn't jump to line 877 because the condition on line 876 was never true
877 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
878 self._sock.close()
879 if self._server is not None:
880 self._server._detach(self)
882 def _fatal_error(self, exc, message='Fatal error on transport'):
883 # Should be called from exception handler only.
884 if isinstance(exc, OSError):
885 if self._loop.get_debug(): 885 ↛ 886line 885 didn't jump to line 886 because the condition on line 885 was never true
886 logger.debug("%r: %s", self, message, exc_info=True)
887 else:
888 self._loop.call_exception_handler({
889 'message': message,
890 'exception': exc,
891 'transport': self,
892 'protocol': self._protocol,
893 })
894 self._force_close(exc)
896 def _force_close(self, exc):
897 if self._conn_lost:
898 return
899 if self._buffer:
900 self._buffer.clear()
901 self._buffer_size = 0
902 self._loop._remove_writer(self._sock_fd)
903 if not self._closing:
904 self._closing = True
905 self._loop._remove_reader(self._sock_fd)
906 self._conn_lost += 1
907 self._call_soon(self._call_connection_lost, exc)
909 def _call_connection_lost(self, exc):
910 try:
911 if self._protocol_connected: 911 ↛ 914line 911 didn't jump to line 914 because the condition on line 911 was always true
912 self._protocol.connection_lost(exc)
913 finally:
914 self._sock.close()
915 self._sock = None
916 self._protocol = None
917 self._loop = None
918 server = self._server
919 if server is not None:
920 server._detach(self)
921 self._server = None
923 def get_write_buffer_size(self):
924 return self._buffer_size
926 def _add_reader(self, fd, callback, *args):
927 if not self.is_reading():
928 return
929 self._loop._add_reader(fd, callback, *args, context=self._context)
931 def _add_writer(self, fd, callback, *args):
932 self._loop._add_writer(fd, callback, *args, context=self._context)
934 def _call_soon(self, callback, *args):
935 self._loop.call_soon(callback, *args, context=self._context)
937class _SelectorSocketTransport(_SelectorTransport):
939 _start_tls_compatible = True
940 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
942 def __init__(self, loop, sock, protocol, waiter=None,
943 extra=None, server=None, context=None):
944 self._read_ready_cb = None
945 super().__init__(loop, sock, protocol, extra, server, context)
946 self._eof = False
947 self._empty_waiter = None
948 if _HAS_SENDMSG: 948 ↛ 951line 948 didn't jump to line 951 because the condition on line 948 was always true
949 self._write_ready = self._write_sendmsg
950 else:
951 self._write_ready = self._write_send
952 # Disable the Nagle algorithm -- small writes will be
953 # sent without waiting for the TCP ACK. This generally
954 # decreases the latency (in some cases significantly.)
955 base_events._set_nodelay(self._sock)
957 self._call_soon(self._protocol.connection_made, self)
958 # only start reading when connection_made() has been called
959 self._call_soon(self._add_reader, self._sock_fd, self._read_ready)
960 if waiter is not None:
961 # only wake up the waiter when connection_made() has been called
962 self._call_soon(futures._set_result_unless_cancelled, waiter, None)
964 def set_protocol(self, protocol):
965 if isinstance(protocol, protocols.BufferedProtocol):
966 self._read_ready_cb = self._read_ready__get_buffer
967 else:
968 self._read_ready_cb = self._read_ready__data_received
970 super().set_protocol(protocol)
972 def _read_ready(self):
973 self._read_ready_cb()
975 def _read_ready__get_buffer(self):
976 if self._conn_lost: 976 ↛ 977line 976 didn't jump to line 977 because the condition on line 976 was never true
977 return
979 try:
980 buf = self._protocol.get_buffer(-1)
981 if not len(buf):
982 raise RuntimeError('get_buffer() returned an empty buffer')
983 except (SystemExit, KeyboardInterrupt):
984 raise
985 except BaseException as exc:
986 self._fatal_error(
987 exc, 'Fatal error: protocol.get_buffer() call failed.')
988 return
990 try:
991 nbytes = self._sock.recv_into(buf)
992 except (BlockingIOError, InterruptedError):
993 return
994 except (SystemExit, KeyboardInterrupt):
995 raise
996 except BaseException as exc:
997 self._fatal_error(exc, 'Fatal read error on socket transport')
998 return
1000 if not nbytes:
1001 self._read_ready__on_eof()
1002 return
1004 try:
1005 self._protocol.buffer_updated(nbytes)
1006 except (SystemExit, KeyboardInterrupt):
1007 raise
1008 except BaseException as exc:
1009 self._fatal_error(
1010 exc, 'Fatal error: protocol.buffer_updated() call failed.')
1012 def _read_ready__data_received(self):
1013 if self._conn_lost: 1013 ↛ 1014line 1013 didn't jump to line 1014 because the condition on line 1013 was never true
1014 return
1015 try:
1016 data = self._sock.recv(self.max_size)
1017 except (BlockingIOError, InterruptedError):
1018 return
1019 except (SystemExit, KeyboardInterrupt):
1020 raise
1021 except BaseException as exc:
1022 self._fatal_error(exc, 'Fatal read error on socket transport')
1023 return
1025 if not data:
1026 self._read_ready__on_eof()
1027 return
1029 try:
1030 self._protocol.data_received(data)
1031 except (SystemExit, KeyboardInterrupt):
1032 raise
1033 except BaseException as exc:
1034 self._fatal_error(
1035 exc, 'Fatal error: protocol.data_received() call failed.')
1037 def _read_ready__on_eof(self):
1038 if self._loop.get_debug():
1039 logger.debug("%r received EOF", self)
1041 try:
1042 keep_open = self._protocol.eof_received()
1043 except (SystemExit, KeyboardInterrupt):
1044 raise
1045 except BaseException as exc:
1046 self._fatal_error(
1047 exc, 'Fatal error: protocol.eof_received() call failed.')
1048 return
1050 if keep_open:
1051 # We're keeping the connection open so the
1052 # protocol can write more, but we still can't
1053 # receive more, so remove the reader callback.
1054 self._loop._remove_reader(self._sock_fd)
1055 else:
1056 self.close()
1058 def write(self, data):
1059 if not isinstance(data, (bytes, bytearray, memoryview)):
1060 raise TypeError(f'data argument must be a bytes, bytearray, or memoryview '
1061 f'object, not {type(data).__name__!r}')
1062 if self._eof: 1062 ↛ 1063line 1062 didn't jump to line 1063 because the condition on line 1062 was never true
1063 raise RuntimeError('Cannot call write() after write_eof()')
1064 if self._empty_waiter is not None:
1065 raise RuntimeError('unable to write; sendfile is in progress')
1066 if not data:
1067 return
1069 if self._conn_lost:
1070 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1071 logger.warning('socket.send() raised exception.')
1072 self._conn_lost += 1
1073 return
1075 if not self._buffer:
1076 # Optimization: try to send now.
1077 try:
1078 n = self._sock.send(data)
1079 except (BlockingIOError, InterruptedError):
1080 pass
1081 except (SystemExit, KeyboardInterrupt):
1082 raise
1083 except BaseException as exc:
1084 self._fatal_error(exc, 'Fatal write error on socket transport')
1085 return
1086 else:
1087 data = memoryview(data)[n:]
1088 if not data:
1089 return
1090 # Not all was written; register write handler.
1091 self._add_writer(self._sock_fd, self._write_ready)
1093 # Add it to the buffer.
1094 self._buffer.append(data)
1095 self._buffer_size += len(data)
1096 self._maybe_pause_protocol()
1098 def _get_sendmsg_buffer(self):
1099 return itertools.islice(self._buffer, SC_IOV_MAX)
1101 def _write_sendmsg(self):
1102 assert self._buffer, 'Data should not be empty'
1103 if self._conn_lost: 1103 ↛ 1104line 1103 didn't jump to line 1104 because the condition on line 1103 was never true
1104 return
1105 try:
1106 nbytes = self._sock.sendmsg(self._get_sendmsg_buffer())
1107 self._adjust_leftover_buffer(nbytes)
1108 except (BlockingIOError, InterruptedError):
1109 pass
1110 except (SystemExit, KeyboardInterrupt):
1111 raise
1112 except BaseException as exc:
1113 self._loop._remove_writer(self._sock_fd)
1114 self._buffer.clear()
1115 self._buffer_size = 0
1116 self._fatal_error(exc, 'Fatal write error on socket transport')
1117 if self._empty_waiter is not None: 1117 ↛ 1118line 1117 didn't jump to line 1118 because the condition on line 1117 was never true
1118 self._empty_waiter.set_exception(exc)
1119 else:
1120 self._maybe_resume_protocol() # May append to buffer.
1121 if not self._buffer:
1122 self._loop._remove_writer(self._sock_fd)
1123 if self._empty_waiter is not None:
1124 self._empty_waiter.set_result(None)
1125 if self._closing:
1126 self._call_connection_lost(None)
1127 elif self._eof: 1127 ↛ 1128line 1127 didn't jump to line 1128 because the condition on line 1127 was never true
1128 self._sock.shutdown(socket.SHUT_WR)
1130 def _adjust_leftover_buffer(self, nbytes: int) -> None:
1131 self._buffer_size -= nbytes
1132 buffer = self._buffer
1133 while nbytes:
1134 b = buffer.popleft()
1135 b_len = len(b)
1136 if b_len <= nbytes:
1137 nbytes -= b_len
1138 else:
1139 buffer.appendleft(b[nbytes:])
1140 break
1142 def _write_send(self):
1143 assert self._buffer, 'Data should not be empty'
1144 if self._conn_lost: 1144 ↛ 1145line 1144 didn't jump to line 1145 because the condition on line 1144 was never true
1145 return
1146 try:
1147 buffer = self._buffer.popleft()
1148 n = self._sock.send(buffer)
1149 if n != len(buffer):
1150 # Not all data was written
1151 self._buffer.appendleft(buffer[n:])
1152 self._buffer_size -= n
1153 except (BlockingIOError, InterruptedError):
1154 self._buffer.appendleft(buffer)
1155 return
1156 except (SystemExit, KeyboardInterrupt):
1157 raise
1158 except BaseException as exc:
1159 self._loop._remove_writer(self._sock_fd)
1160 self._buffer.clear()
1161 self._buffer_size = 0
1162 self._fatal_error(exc, 'Fatal write error on socket transport')
1163 if self._empty_waiter is not None: 1163 ↛ 1164line 1163 didn't jump to line 1164 because the condition on line 1163 was never true
1164 self._empty_waiter.set_exception(exc)
1165 else:
1166 self._maybe_resume_protocol() # May append to buffer.
1167 if not self._buffer:
1168 self._loop._remove_writer(self._sock_fd)
1169 if self._empty_waiter is not None: 1169 ↛ 1170line 1169 didn't jump to line 1170 because the condition on line 1169 was never true
1170 self._empty_waiter.set_result(None)
1171 if self._closing:
1172 self._call_connection_lost(None)
1173 elif self._eof:
1174 self._sock.shutdown(socket.SHUT_WR)
1176 def write_eof(self):
1177 if self._closing or self._eof:
1178 return
1179 self._eof = True
1180 if not self._buffer:
1181 self._sock.shutdown(socket.SHUT_WR)
1183 def writelines(self, list_of_data):
1184 if self._eof: 1184 ↛ 1185line 1184 didn't jump to line 1185 because the condition on line 1184 was never true
1185 raise RuntimeError('Cannot call writelines() after write_eof()')
1186 if self._empty_waiter is not None: 1186 ↛ 1187line 1186 didn't jump to line 1187 because the condition on line 1186 was never true
1187 raise RuntimeError('unable to writelines; sendfile is in progress')
1188 if not list_of_data: 1188 ↛ 1189line 1188 didn't jump to line 1189 because the condition on line 1188 was never true
1189 return
1191 if self._conn_lost:
1192 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 1192 ↛ 1193line 1192 didn't jump to line 1193 because the condition on line 1192 was never true
1193 logger.warning('socket.send() raised exception.')
1194 self._conn_lost += 1
1195 return
1197 for data in list_of_data:
1198 self._buffer.append(memoryview(data))
1199 self._buffer_size += len(data)
1200 self._write_ready()
1201 # If the entire buffer couldn't be written, register a write handler
1202 if self._buffer:
1203 self._add_writer(self._sock_fd, self._write_ready)
1204 self._maybe_pause_protocol()
1206 def can_write_eof(self):
1207 return True
1209 def _call_connection_lost(self, exc):
1210 try:
1211 super()._call_connection_lost(exc)
1212 finally:
1213 self._write_ready = None
1214 if self._empty_waiter is not None: 1214 ↛ 1215line 1214 didn't jump to line 1215 because the condition on line 1214 was never true
1215 self._empty_waiter.set_exception(
1216 ConnectionError("Connection is closed by peer"))
1218 def _make_empty_waiter(self):
1219 if self._empty_waiter is not None: 1219 ↛ 1220line 1219 didn't jump to line 1220 because the condition on line 1219 was never true
1220 raise RuntimeError("Empty waiter is already set")
1221 self._empty_waiter = self._loop.create_future()
1222 if not self._buffer:
1223 self._empty_waiter.set_result(None)
1224 return self._empty_waiter
1226 def _reset_empty_waiter(self):
1227 self._empty_waiter = None
1229 def close(self):
1230 self._read_ready_cb = None
1231 super().close()
1234class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport):
1236 _buffer_factory = collections.deque
1237 _header_size = 8
1239 def __init__(self, loop, sock, protocol, address=None,
1240 waiter=None, extra=None):
1241 super().__init__(loop, sock, protocol, extra)
1242 self._address = address
1243 self._buffer_size = 0
1244 self._call_soon(self._protocol.connection_made, self)
1245 # only start reading when connection_made() has been called
1246 self._call_soon(self._add_reader, self._sock_fd, self._read_ready)
1247 if waiter is not None:
1248 # only wake up the waiter when connection_made() has been called
1249 self._call_soon(futures._set_result_unless_cancelled, waiter, None)
1251 def get_write_buffer_size(self):
1252 return self._buffer_size
1254 def _read_ready(self):
1255 if self._conn_lost: 1255 ↛ 1256line 1255 didn't jump to line 1256 because the condition on line 1255 was never true
1256 return
1257 try:
1258 data, addr = self._sock.recvfrom(self.max_size)
1259 except (BlockingIOError, InterruptedError):
1260 pass
1261 except OSError as exc:
1262 self._protocol.error_received(exc)
1263 except (SystemExit, KeyboardInterrupt):
1264 raise
1265 except BaseException as exc:
1266 self._fatal_error(exc, 'Fatal read error on datagram transport')
1267 else:
1268 self._protocol.datagram_received(data, addr)
1270 def sendto(self, data, addr=None):
1271 if not isinstance(data, (bytes, bytearray, memoryview)):
1272 raise TypeError(f'data argument must be a bytes-like object, '
1273 f'not {type(data).__name__!r}')
1275 if self._address:
1276 if addr not in (None, self._address):
1277 raise ValueError(
1278 f'Invalid address: must be None or {self._address}')
1279 addr = self._address
1281 if self._conn_lost and self._address:
1282 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1283 logger.warning('socket.send() raised exception.')
1284 self._conn_lost += 1
1285 return
1287 if not self._buffer:
1288 # Attempt to send it right away first.
1289 try:
1290 if self._extra['peername']:
1291 self._sock.send(data)
1292 else:
1293 self._sock.sendto(data, addr)
1294 return
1295 except (BlockingIOError, InterruptedError):
1296 self._add_writer(self._sock_fd, self._sendto_ready)
1297 except OSError as exc:
1298 self._protocol.error_received(exc)
1299 return
1300 except (SystemExit, KeyboardInterrupt):
1301 raise
1302 except BaseException as exc:
1303 self._fatal_error(
1304 exc, 'Fatal write error on datagram transport')
1305 return
1307 # Ensure that what we buffer is immutable.
1308 self._buffer.append((bytes(data), addr))
1309 self._buffer_size += len(data) + self._header_size
1310 self._maybe_pause_protocol()
1312 def _sendto_ready(self):
1313 while self._buffer:
1314 data, addr = self._buffer.popleft()
1315 self._buffer_size -= len(data) + self._header_size
1316 try:
1317 if self._extra['peername']:
1318 self._sock.send(data)
1319 else:
1320 self._sock.sendto(data, addr)
1321 except (BlockingIOError, InterruptedError):
1322 self._buffer.appendleft((data, addr)) # Try again later.
1323 self._buffer_size += len(data) + self._header_size
1324 break
1325 except OSError as exc:
1326 self._protocol.error_received(exc)
1327 return
1328 except (SystemExit, KeyboardInterrupt):
1329 raise
1330 except BaseException as exc:
1331 self._fatal_error(
1332 exc, 'Fatal write error on datagram transport')
1333 return
1335 self._maybe_resume_protocol() # May append to buffer.
1336 if not self._buffer:
1337 self._loop._remove_writer(self._sock_fd)
1338 if self._closing:
1339 self._call_connection_lost(None)