Coverage for Lib/asyncio/selector_events.py: 87%
900 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
1"""Event loop using a selector and related classes.
3A selector is a "notify-when-ready" multiplexer. For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
7__all__ = 'BaseSelectorEventLoop',
9import collections
10import errno
11import functools
12import itertools
13import os
14import selectors
15import socket
16import warnings
17import weakref
18try:
19 import ssl
20except ImportError: # pragma: no cover
21 ssl = None
23from . import base_events
24from . import constants
25from . import events
26from . import futures
27from . import protocols
28from . import sslproto
29from . import transports
30from . import trsock
31from .log import logger
33_HAS_SENDMSG = hasattr(socket.socket, 'sendmsg')
35if _HAS_SENDMSG: 35 ↛ 42line 35 didn't jump to line 42 because the condition on line 35 was always true
36 try:
37 SC_IOV_MAX = os.sysconf('SC_IOV_MAX')
38 except OSError:
39 # Fallback to send
40 _HAS_SENDMSG = False
42def _test_selector_event(selector, fd, event):
43 # Test if the selector is monitoring 'event' events
44 # for the file descriptor 'fd'.
45 try:
46 key = selector.get_key(fd)
47 except KeyError:
48 return False
49 else:
50 return bool(key.events & event)
53class BaseSelectorEventLoop(base_events.BaseEventLoop):
54 """Selector event loop.
56 See events.EventLoop for API specification.
57 """
59 def __init__(self, selector=None):
60 super().__init__()
62 if selector is None:
63 selector = selectors.DefaultSelector()
64 logger.debug('Using selector: %s', selector.__class__.__name__)
65 self._selector = selector
66 self._make_self_pipe()
67 self._transports = weakref.WeakValueDictionary()
69 def _make_socket_transport(self, sock, protocol, waiter=None, *,
70 extra=None, server=None):
71 self._ensure_fd_no_transport(sock)
72 return _SelectorSocketTransport(self, sock, protocol, waiter,
73 extra, server)
75 def _make_ssl_transport(
76 self, rawsock, protocol, sslcontext, waiter=None,
77 *, server_side=False, server_hostname=None,
78 extra=None, server=None,
79 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
80 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT,
81 ):
82 self._ensure_fd_no_transport(rawsock)
83 ssl_protocol = sslproto.SSLProtocol(
84 self, protocol, sslcontext, waiter,
85 server_side, server_hostname,
86 ssl_handshake_timeout=ssl_handshake_timeout,
87 ssl_shutdown_timeout=ssl_shutdown_timeout
88 )
89 _SelectorSocketTransport(self, rawsock, ssl_protocol,
90 extra=extra, server=server)
91 return ssl_protocol._app_transport
93 def _make_datagram_transport(self, sock, protocol,
94 address=None, waiter=None, extra=None):
95 self._ensure_fd_no_transport(sock)
96 return _SelectorDatagramTransport(self, sock, protocol,
97 address, waiter, extra)
99 def close(self):
100 if self.is_running():
101 raise RuntimeError("Cannot close a running event loop")
102 if self.is_closed():
103 return
104 self._close_self_pipe()
105 super().close()
106 if self._selector is not None:
107 self._selector.close()
108 self._selector = None
110 def _close_self_pipe(self):
111 self._remove_reader(self._ssock.fileno())
112 self._ssock.close()
113 self._ssock = None
114 self._csock.close()
115 self._csock = None
116 self._internal_fds -= 1
118 def _make_self_pipe(self):
119 # A self-socket, really. :-)
120 self._ssock, self._csock = socket.socketpair()
121 self._ssock.setblocking(False)
122 self._csock.setblocking(False)
123 self._internal_fds += 1
124 self._add_reader(self._ssock.fileno(), self._read_from_self)
126 def _process_self_data(self, data):
127 pass
129 def _read_from_self(self):
130 while True:
131 try:
132 data = self._ssock.recv(4096)
133 if not data: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true
134 break
135 self._process_self_data(data)
136 except InterruptedError:
137 continue
138 except BlockingIOError:
139 break
141 def _write_to_self(self):
142 # This may be called from a different thread, possibly after
143 # _close_self_pipe() has been called or even while it is
144 # running. Guard for self._csock being None or closed. When
145 # a socket is closed, send() raises OSError (with errno set to
146 # EBADF, but let's not rely on the exact error code).
147 csock = self._csock
148 if csock is None: 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true
149 return
151 try:
152 csock.send(b'\0')
153 except OSError:
154 if self._debug: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true
155 logger.debug("Fail to write a null byte into the "
156 "self-pipe socket",
157 exc_info=True)
159 def _start_serving(self, protocol_factory, sock,
160 sslcontext=None, server=None, backlog=100,
161 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
162 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
163 self._add_reader(sock.fileno(), self._accept_connection,
164 protocol_factory, sock, sslcontext, server, backlog,
165 ssl_handshake_timeout, ssl_shutdown_timeout)
167 def _accept_connection(
168 self, protocol_factory, sock,
169 sslcontext=None, server=None, backlog=100,
170 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
171 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
172 # This method is only called once for each event loop tick where the
173 # listening socket has triggered an EVENT_READ. There may be multiple
174 # connections waiting for an .accept() so it is called in a loop.
175 # See https://bugs.python.org/issue27906 for more details.
176 for _ in range(backlog):
177 try:
178 conn, addr = sock.accept()
179 if self._debug:
180 logger.debug("%r got a new connection from %r: %r",
181 server, addr, conn)
182 conn.setblocking(False)
183 except ConnectionAbortedError:
184 # Discard connections that were aborted before accept().
185 continue
186 except (BlockingIOError, InterruptedError):
187 # Early exit because of a signal or
188 # the socket accept buffer is empty.
189 return
190 except OSError as exc:
191 # There's nowhere to send the error, so just log it.
192 if exc.errno in (errno.EMFILE, errno.ENFILE, 192 ↛ 209line 192 didn't jump to line 209 because the condition on line 192 was always true
193 errno.ENOBUFS, errno.ENOMEM):
194 # Some platforms (e.g. Linux keep reporting the FD as
195 # ready, so we remove the read handler temporarily.
196 # We'll try again in a while.
197 self.call_exception_handler({
198 'message': 'socket.accept() out of system resource',
199 'exception': exc,
200 'socket': trsock.TransportSocket(sock),
201 })
202 self._remove_reader(sock.fileno())
203 self.call_later(constants.ACCEPT_RETRY_DELAY,
204 self._start_serving,
205 protocol_factory, sock, sslcontext, server,
206 backlog, ssl_handshake_timeout,
207 ssl_shutdown_timeout)
208 else:
209 raise # The event loop will catch, log and ignore it.
210 else:
211 extra = {'peername': addr}
212 accept = self._accept_connection2(
213 protocol_factory, conn, extra, sslcontext, server,
214 ssl_handshake_timeout, ssl_shutdown_timeout)
215 self.create_task(accept)
217 async def _accept_connection2(
218 self, protocol_factory, conn, extra,
219 sslcontext=None, server=None,
220 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
221 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
222 protocol = None
223 transport = None
224 try:
225 protocol = protocol_factory()
226 waiter = self.create_future()
227 if sslcontext:
228 transport = self._make_ssl_transport(
229 conn, protocol, sslcontext, waiter=waiter,
230 server_side=True, extra=extra, server=server,
231 ssl_handshake_timeout=ssl_handshake_timeout,
232 ssl_shutdown_timeout=ssl_shutdown_timeout)
233 else:
234 transport = self._make_socket_transport(
235 conn, protocol, waiter=waiter, extra=extra,
236 server=server)
238 try:
239 await waiter
240 except BaseException:
241 transport.close()
242 # gh-109534: When an exception is raised by the SSLProtocol object the
243 # exception set in this future can keep the protocol object alive and
244 # cause a reference cycle.
245 waiter = None
246 raise
247 # It's now up to the protocol to handle the connection.
249 except (SystemExit, KeyboardInterrupt):
250 raise
251 except BaseException as exc:
252 if self._debug: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true
253 context = {
254 'message':
255 'Error on transport creation for incoming connection',
256 'exception': exc,
257 }
258 if protocol is not None:
259 context['protocol'] = protocol
260 if transport is not None:
261 context['transport'] = transport
262 self.call_exception_handler(context)
264 def _ensure_fd_no_transport(self, fd):
265 fileno = fd
266 if not isinstance(fileno, int):
267 try:
268 fileno = int(fileno.fileno())
269 except (AttributeError, TypeError, ValueError):
270 # This code matches selectors._fileobj_to_fd function.
271 raise ValueError(f"Invalid file object: {fd!r}") from None
272 transport = self._transports.get(fileno)
273 if transport and not transport.is_closing():
274 raise RuntimeError(
275 f'File descriptor {fd!r} is used by transport '
276 f'{transport!r}')
278 def _add_reader(self, fd, callback, *args):
279 self._check_closed()
280 handle = events.Handle(callback, args, self, None)
281 key = self._selector.get_map().get(fd)
282 if key is None:
283 self._selector.register(fd, selectors.EVENT_READ,
284 (handle, None))
285 else:
286 mask, (reader, writer) = key.events, key.data
287 self._selector.modify(fd, mask | selectors.EVENT_READ,
288 (handle, writer))
289 if reader is not None:
290 reader.cancel()
291 return handle
293 def _remove_reader(self, fd):
294 if self.is_closed():
295 return False
296 key = self._selector.get_map().get(fd)
297 if key is None:
298 return False
299 mask, (reader, writer) = key.events, key.data
300 mask &= ~selectors.EVENT_READ
301 if not mask:
302 self._selector.unregister(fd)
303 else:
304 self._selector.modify(fd, mask, (None, writer))
306 if reader is not None:
307 reader.cancel()
308 return True
309 else:
310 return False
312 def _add_writer(self, fd, callback, *args):
313 self._check_closed()
314 handle = events.Handle(callback, args, self, None)
315 key = self._selector.get_map().get(fd)
316 if key is None:
317 self._selector.register(fd, selectors.EVENT_WRITE,
318 (None, handle))
319 else:
320 mask, (reader, writer) = key.events, key.data
321 self._selector.modify(fd, mask | selectors.EVENT_WRITE,
322 (reader, handle))
323 if writer is not None:
324 writer.cancel()
325 return handle
327 def _remove_writer(self, fd):
328 """Remove a writer callback."""
329 if self.is_closed():
330 return False
331 key = self._selector.get_map().get(fd)
332 if key is None:
333 return False
334 mask, (reader, writer) = key.events, key.data
335 # Remove both writer and connector.
336 mask &= ~selectors.EVENT_WRITE
337 if not mask:
338 self._selector.unregister(fd)
339 else:
340 self._selector.modify(fd, mask, (reader, None))
342 if writer is not None:
343 writer.cancel()
344 return True
345 else:
346 return False
348 def add_reader(self, fd, callback, *args):
349 """Add a reader callback."""
350 self._ensure_fd_no_transport(fd)
351 self._add_reader(fd, callback, *args)
353 def remove_reader(self, fd):
354 """Remove a reader callback."""
355 self._ensure_fd_no_transport(fd)
356 return self._remove_reader(fd)
358 def add_writer(self, fd, callback, *args):
359 """Add a writer callback.."""
360 self._ensure_fd_no_transport(fd)
361 self._add_writer(fd, callback, *args)
363 def remove_writer(self, fd):
364 """Remove a writer callback."""
365 self._ensure_fd_no_transport(fd)
366 return self._remove_writer(fd)
368 async def sock_recv(self, sock, n):
369 """Receive data from the socket.
371 The return value is a bytes object representing the data received.
372 The maximum amount of data to be received at once is specified by
373 nbytes.
374 """
375 base_events._check_ssl_socket(sock)
376 if self._debug and sock.gettimeout() != 0:
377 raise ValueError("the socket must be non-blocking")
378 try:
379 return sock.recv(n)
380 except (BlockingIOError, InterruptedError):
381 pass
382 fut = self.create_future()
383 fd = sock.fileno()
384 self._ensure_fd_no_transport(fd)
385 handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
386 fut.add_done_callback(
387 functools.partial(self._sock_read_done, fd, handle=handle))
388 return await fut
390 def _sock_read_done(self, fd, fut, handle=None):
391 if handle is None or not handle.cancelled():
392 self.remove_reader(fd)
394 def _sock_recv(self, fut, sock, n):
395 # _sock_recv() can add itself as an I/O callback if the operation can't
396 # be done immediately. Don't use it directly, call sock_recv().
397 if fut.done(): 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true
398 return
399 try:
400 data = sock.recv(n)
401 except (BlockingIOError, InterruptedError):
402 return # try again next time
403 except (SystemExit, KeyboardInterrupt):
404 raise
405 except BaseException as exc:
406 fut.set_exception(exc)
407 else:
408 fut.set_result(data)
410 async def sock_recv_into(self, sock, buf):
411 """Receive data from the socket.
413 The received data is written into *buf* (a writable buffer).
414 The return value is the number of bytes written.
415 """
416 base_events._check_ssl_socket(sock)
417 if self._debug and sock.gettimeout() != 0:
418 raise ValueError("the socket must be non-blocking")
419 try:
420 return sock.recv_into(buf)
421 except (BlockingIOError, InterruptedError):
422 pass
423 fut = self.create_future()
424 fd = sock.fileno()
425 self._ensure_fd_no_transport(fd)
426 handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
427 fut.add_done_callback(
428 functools.partial(self._sock_read_done, fd, handle=handle))
429 return await fut
431 def _sock_recv_into(self, fut, sock, buf):
432 # _sock_recv_into() can add itself as an I/O callback if the operation
433 # can't be done immediately. Don't use it directly, call
434 # sock_recv_into().
435 if fut.done(): 435 ↛ 436line 435 didn't jump to line 436 because the condition on line 435 was never true
436 return
437 try:
438 nbytes = sock.recv_into(buf)
439 except (BlockingIOError, InterruptedError):
440 return # try again next time
441 except (SystemExit, KeyboardInterrupt):
442 raise
443 except BaseException as exc:
444 fut.set_exception(exc)
445 else:
446 fut.set_result(nbytes)
448 async def sock_recvfrom(self, sock, bufsize):
449 """Receive a datagram from a datagram socket.
451 The return value is a tuple of (bytes, address) representing the
452 datagram received and the address it came from.
453 The maximum amount of data to be received at once is specified by
454 nbytes.
455 """
456 base_events._check_ssl_socket(sock)
457 if self._debug and sock.gettimeout() != 0: 457 ↛ 458line 457 didn't jump to line 458 because the condition on line 457 was never true
458 raise ValueError("the socket must be non-blocking")
459 try:
460 return sock.recvfrom(bufsize)
461 except (BlockingIOError, InterruptedError):
462 pass
463 fut = self.create_future()
464 fd = sock.fileno()
465 self._ensure_fd_no_transport(fd)
466 handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize)
467 fut.add_done_callback(
468 functools.partial(self._sock_read_done, fd, handle=handle))
469 return await fut
471 def _sock_recvfrom(self, fut, sock, bufsize):
472 # _sock_recvfrom() can add itself as an I/O callback if the operation
473 # can't be done immediately. Don't use it directly, call
474 # sock_recvfrom().
475 if fut.done(): 475 ↛ 476line 475 didn't jump to line 476 because the condition on line 475 was never true
476 return
477 try:
478 result = sock.recvfrom(bufsize)
479 except (BlockingIOError, InterruptedError):
480 return # try again next time
481 except (SystemExit, KeyboardInterrupt):
482 raise
483 except BaseException as exc:
484 fut.set_exception(exc)
485 else:
486 fut.set_result(result)
488 async def sock_recvfrom_into(self, sock, buf, nbytes=0):
489 """Receive data from the socket.
491 The received data is written into *buf* (a writable buffer).
492 The return value is a tuple of (number of bytes written, address).
493 """
494 base_events._check_ssl_socket(sock)
495 if self._debug and sock.gettimeout() != 0: 495 ↛ 496line 495 didn't jump to line 496 because the condition on line 495 was never true
496 raise ValueError("the socket must be non-blocking")
497 if not nbytes:
498 nbytes = len(buf)
500 try:
501 return sock.recvfrom_into(buf, nbytes)
502 except (BlockingIOError, InterruptedError):
503 pass
504 fut = self.create_future()
505 fd = sock.fileno()
506 self._ensure_fd_no_transport(fd)
507 handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf,
508 nbytes)
509 fut.add_done_callback(
510 functools.partial(self._sock_read_done, fd, handle=handle))
511 return await fut
513 def _sock_recvfrom_into(self, fut, sock, buf, bufsize):
514 # _sock_recv_into() can add itself as an I/O callback if the operation
515 # can't be done immediately. Don't use it directly, call
516 # sock_recv_into().
517 if fut.done(): 517 ↛ 518line 517 didn't jump to line 518 because the condition on line 517 was never true
518 return
519 try:
520 result = sock.recvfrom_into(buf, bufsize)
521 except (BlockingIOError, InterruptedError):
522 return # try again next time
523 except (SystemExit, KeyboardInterrupt):
524 raise
525 except BaseException as exc:
526 fut.set_exception(exc)
527 else:
528 fut.set_result(result)
530 async def sock_sendall(self, sock, data):
531 """Send data to the socket.
533 The socket must be connected to a remote socket. This method continues
534 to send data from data until either all data has been sent or an
535 error occurs. None is returned on success. On error, an exception is
536 raised, and there is no way to determine how much data, if any, was
537 successfully processed by the receiving end of the connection.
538 """
539 base_events._check_ssl_socket(sock)
540 if self._debug and sock.gettimeout() != 0:
541 raise ValueError("the socket must be non-blocking")
542 try:
543 n = sock.send(data)
544 except (BlockingIOError, InterruptedError):
545 n = 0
547 if n == len(data):
548 # all data sent
549 return
551 fut = self.create_future()
552 fd = sock.fileno()
553 self._ensure_fd_no_transport(fd)
554 # use a trick with a list in closure to store a mutable state
555 handle = self._add_writer(fd, self._sock_sendall, fut, sock,
556 memoryview(data), [n])
557 fut.add_done_callback(
558 functools.partial(self._sock_write_done, fd, handle=handle))
559 return await fut
561 def _sock_sendall(self, fut, sock, view, pos):
562 if fut.done(): 562 ↛ 564line 562 didn't jump to line 564 because the condition on line 562 was never true
563 # Future cancellation can be scheduled on previous loop iteration
564 return
565 start = pos[0]
566 try:
567 n = sock.send(view[start:])
568 except (BlockingIOError, InterruptedError):
569 return
570 except (SystemExit, KeyboardInterrupt):
571 raise
572 except BaseException as exc:
573 fut.set_exception(exc)
574 return
576 start += n
578 if start == len(view):
579 fut.set_result(None)
580 else:
581 pos[0] = start
583 async def sock_sendto(self, sock, data, address):
584 """Send data to the socket.
586 The socket must be connected to a remote socket. This method continues
587 to send data from data until either all data has been sent or an
588 error occurs. None is returned on success. On error, an exception is
589 raised, and there is no way to determine how much data, if any, was
590 successfully processed by the receiving end of the connection.
591 """
592 base_events._check_ssl_socket(sock)
593 if self._debug and sock.gettimeout() != 0: 593 ↛ 594line 593 didn't jump to line 594 because the condition on line 593 was never true
594 raise ValueError("the socket must be non-blocking")
595 try:
596 return sock.sendto(data, address)
597 except (BlockingIOError, InterruptedError):
598 pass
600 fut = self.create_future()
601 fd = sock.fileno()
602 self._ensure_fd_no_transport(fd)
603 # use a trick with a list in closure to store a mutable state
604 handle = self._add_writer(fd, self._sock_sendto, fut, sock, data,
605 address)
606 fut.add_done_callback(
607 functools.partial(self._sock_write_done, fd, handle=handle))
608 return await fut
610 def _sock_sendto(self, fut, sock, data, address):
611 if fut.done(): 611 ↛ 613line 611 didn't jump to line 613 because the condition on line 611 was never true
612 # Future cancellation can be scheduled on previous loop iteration
613 return
614 try:
615 n = sock.sendto(data, 0, address)
616 except (BlockingIOError, InterruptedError):
617 return
618 except (SystemExit, KeyboardInterrupt):
619 raise
620 except BaseException as exc:
621 fut.set_exception(exc)
622 else:
623 fut.set_result(n)
625 async def sock_connect(self, sock, address):
626 """Connect to a remote socket at address.
628 This method is a coroutine.
629 """
630 base_events._check_ssl_socket(sock)
631 if self._debug and sock.gettimeout() != 0:
632 raise ValueError("the socket must be non-blocking")
634 if sock.family == socket.AF_INET or (
635 base_events._HAS_IPv6 and sock.family == socket.AF_INET6):
636 resolved = await self._ensure_resolved(
637 address, family=sock.family, type=sock.type, proto=sock.proto,
638 loop=self,
639 )
640 _, _, _, _, address = resolved[0]
642 fut = self.create_future()
643 self._sock_connect(fut, sock, address)
644 try:
645 return await fut
646 finally:
647 # Needed to break cycles when an exception occurs.
648 fut = None
650 def _sock_connect(self, fut, sock, address):
651 fd = sock.fileno()
652 try:
653 sock.connect(address)
654 except (BlockingIOError, InterruptedError):
655 # Issue #23618: When the C function connect() fails with EINTR, the
656 # connection runs in background. We have to wait until the socket
657 # becomes writable to be notified when the connection succeed or
658 # fails.
659 self._ensure_fd_no_transport(fd)
660 handle = self._add_writer(
661 fd, self._sock_connect_cb, fut, sock, address)
662 fut.add_done_callback(
663 functools.partial(self._sock_write_done, fd, handle=handle))
664 except (SystemExit, KeyboardInterrupt):
665 raise
666 except BaseException as exc:
667 fut.set_exception(exc)
668 else:
669 fut.set_result(None)
670 finally:
671 fut = None
673 def _sock_write_done(self, fd, fut, handle=None):
674 if handle is None or not handle.cancelled():
675 self.remove_writer(fd)
677 def _sock_connect_cb(self, fut, sock, address):
678 if fut.done(): 678 ↛ 679line 678 didn't jump to line 679 because the condition on line 678 was never true
679 return
681 try:
682 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
683 if err != 0:
684 # Jump to any except clause below.
685 raise OSError(err, f'Connect call failed {address}')
686 except (BlockingIOError, InterruptedError):
687 # socket is still registered, the callback will be retried later
688 pass
689 except (SystemExit, KeyboardInterrupt):
690 raise
691 except BaseException as exc:
692 fut.set_exception(exc)
693 else:
694 fut.set_result(None)
695 finally:
696 fut = None
698 async def sock_accept(self, sock):
699 """Accept a connection.
701 The socket must be bound to an address and listening for connections.
702 The return value is a pair (conn, address) where conn is a new socket
703 object usable to send and receive data on the connection, and address
704 is the address bound to the socket on the other end of the connection.
705 """
706 base_events._check_ssl_socket(sock)
707 if self._debug and sock.gettimeout() != 0:
708 raise ValueError("the socket must be non-blocking")
709 fut = self.create_future()
710 self._sock_accept(fut, sock)
711 return await fut
713 def _sock_accept(self, fut, sock):
714 fd = sock.fileno()
715 try:
716 conn, address = sock.accept()
717 conn.setblocking(False)
718 except (BlockingIOError, InterruptedError):
719 self._ensure_fd_no_transport(fd)
720 handle = self._add_reader(fd, self._sock_accept, fut, sock)
721 fut.add_done_callback(
722 functools.partial(self._sock_read_done, fd, handle=handle))
723 except (SystemExit, KeyboardInterrupt):
724 raise
725 except BaseException as exc:
726 fut.set_exception(exc)
727 else:
728 fut.set_result((conn, address))
730 async def _sendfile_native(self, transp, file, offset, count):
731 del self._transports[transp._sock_fd]
732 resume_reading = transp.is_reading()
733 transp.pause_reading()
734 await transp._make_empty_waiter()
735 try:
736 return await self.sock_sendfile(transp._sock, file, offset, count,
737 fallback=False)
738 finally:
739 transp._reset_empty_waiter()
740 if resume_reading: 740 ↛ 742line 740 didn't jump to line 742 because the condition on line 740 was always true
741 transp.resume_reading()
742 self._transports[transp._sock_fd] = transp
744 def _process_events(self, event_list):
745 for key, mask in event_list:
746 fileobj, (reader, writer) = key.fileobj, key.data
747 if mask & selectors.EVENT_READ and reader is not None:
748 if reader._cancelled:
749 self._remove_reader(fileobj)
750 else:
751 self._add_callback(reader)
752 if mask & selectors.EVENT_WRITE and writer is not None:
753 if writer._cancelled:
754 self._remove_writer(fileobj)
755 else:
756 self._add_callback(writer)
758 def _stop_serving(self, sock):
759 self._remove_reader(sock.fileno())
760 sock.close()
763class _SelectorTransport(transports._FlowControlMixin,
764 transports.Transport):
766 max_size = 256 * 1024 # Buffer size passed to recv().
768 # Attribute used in the destructor: it must be set even if the constructor
769 # is not called (see _SelectorSslTransport which may start by raising an
770 # exception)
771 _sock = None
773 def __init__(self, loop, sock, protocol, extra=None, server=None):
774 super().__init__(extra, loop)
775 self._extra['socket'] = trsock.TransportSocket(sock)
776 try:
777 self._extra['sockname'] = sock.getsockname()
778 except OSError:
779 self._extra['sockname'] = None
780 if 'peername' not in self._extra:
781 try:
782 self._extra['peername'] = sock.getpeername()
783 except socket.error:
784 self._extra['peername'] = None
785 self._sock = sock
786 self._sock_fd = sock.fileno()
788 self._protocol_connected = False
789 self.set_protocol(protocol)
791 self._server = server
792 self._buffer = collections.deque()
793 self._conn_lost = 0 # Set when call to connection_lost scheduled.
794 self._closing = False # Set when close() called.
795 self._paused = False # Set when pause_reading() called
797 if self._server is not None:
798 self._server._attach(self)
799 loop._transports[self._sock_fd] = self
801 def __repr__(self):
802 info = [self.__class__.__name__]
803 if self._sock is None: 803 ↛ 804line 803 didn't jump to line 804 because the condition on line 803 was never true
804 info.append('closed')
805 elif self._closing:
806 info.append('closing')
807 info.append(f'fd={self._sock_fd}')
808 # test if the transport was closed
809 if self._loop is not None and not self._loop.is_closed():
810 polling = _test_selector_event(self._loop._selector,
811 self._sock_fd, selectors.EVENT_READ)
812 if polling:
813 info.append('read=polling')
814 else:
815 info.append('read=idle')
817 polling = _test_selector_event(self._loop._selector,
818 self._sock_fd,
819 selectors.EVENT_WRITE)
820 if polling: 820 ↛ 821line 820 didn't jump to line 821 because the condition on line 820 was never true
821 state = 'polling'
822 else:
823 state = 'idle'
825 bufsize = self.get_write_buffer_size()
826 info.append(f'write=<{state}, bufsize={bufsize}>')
827 return '<{}>'.format(' '.join(info))
829 def abort(self):
830 self._force_close(None)
832 def set_protocol(self, protocol):
833 self._protocol = protocol
834 self._protocol_connected = True
836 def get_protocol(self):
837 return self._protocol
839 def is_closing(self):
840 return self._closing
842 def is_reading(self):
843 return not self.is_closing() and not self._paused
845 def pause_reading(self):
846 if not self.is_reading():
847 return
848 self._paused = True
849 self._loop._remove_reader(self._sock_fd)
850 if self._loop.get_debug():
851 logger.debug("%r pauses reading", self)
853 def resume_reading(self):
854 if self._closing or not self._paused:
855 return
856 self._paused = False
857 self._add_reader(self._sock_fd, self._read_ready)
858 if self._loop.get_debug(): 858 ↛ 859line 858 didn't jump to line 859 because the condition on line 858 was never true
859 logger.debug("%r resumes reading", self)
861 def close(self):
862 if self._closing:
863 return
864 self._closing = True
865 self._loop._remove_reader(self._sock_fd)
866 if not self._buffer:
867 self._conn_lost += 1
868 self._loop._remove_writer(self._sock_fd)
869 self._loop.call_soon(self._call_connection_lost, None)
871 def __del__(self, _warn=warnings.warn):
872 if self._sock is not None: 872 ↛ 873line 872 didn't jump to line 873 because the condition on line 872 was never true
873 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
874 self._sock.close()
875 if self._server is not None:
876 self._server._detach(self)
878 def _fatal_error(self, exc, message='Fatal error on transport'):
879 # Should be called from exception handler only.
880 if isinstance(exc, OSError):
881 if self._loop.get_debug(): 881 ↛ 882line 881 didn't jump to line 882 because the condition on line 881 was never true
882 logger.debug("%r: %s", self, message, exc_info=True)
883 else:
884 self._loop.call_exception_handler({
885 'message': message,
886 'exception': exc,
887 'transport': self,
888 'protocol': self._protocol,
889 })
890 self._force_close(exc)
892 def _force_close(self, exc):
893 if self._conn_lost:
894 return
895 if self._buffer:
896 self._buffer.clear()
897 self._loop._remove_writer(self._sock_fd)
898 if not self._closing: 898 ↛ 901line 898 didn't jump to line 901 because the condition on line 898 was always true
899 self._closing = True
900 self._loop._remove_reader(self._sock_fd)
901 self._conn_lost += 1
902 self._loop.call_soon(self._call_connection_lost, exc)
904 def _call_connection_lost(self, exc):
905 try:
906 if self._protocol_connected: 906 ↛ 909line 906 didn't jump to line 909 because the condition on line 906 was always true
907 self._protocol.connection_lost(exc)
908 finally:
909 self._sock.close()
910 self._sock = None
911 self._protocol = None
912 self._loop = None
913 server = self._server
914 if server is not None:
915 server._detach(self)
916 self._server = None
918 def get_write_buffer_size(self):
919 return sum(map(len, self._buffer))
921 def _add_reader(self, fd, callback, *args):
922 if not self.is_reading():
923 return
924 self._loop._add_reader(fd, callback, *args)
927class _SelectorSocketTransport(_SelectorTransport):
929 _start_tls_compatible = True
930 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
932 def __init__(self, loop, sock, protocol, waiter=None,
933 extra=None, server=None):
935 self._read_ready_cb = None
936 super().__init__(loop, sock, protocol, extra, server)
937 self._eof = False
938 self._empty_waiter = None
939 if _HAS_SENDMSG: 939 ↛ 942line 939 didn't jump to line 942 because the condition on line 939 was always true
940 self._write_ready = self._write_sendmsg
941 else:
942 self._write_ready = self._write_send
943 # Disable the Nagle algorithm -- small writes will be
944 # sent without waiting for the TCP ACK. This generally
945 # decreases the latency (in some cases significantly.)
946 base_events._set_nodelay(self._sock)
948 self._loop.call_soon(self._protocol.connection_made, self)
949 # only start reading when connection_made() has been called
950 self._loop.call_soon(self._add_reader,
951 self._sock_fd, self._read_ready)
952 if waiter is not None:
953 # only wake up the waiter when connection_made() has been called
954 self._loop.call_soon(futures._set_result_unless_cancelled,
955 waiter, None)
957 def set_protocol(self, protocol):
958 if isinstance(protocol, protocols.BufferedProtocol):
959 self._read_ready_cb = self._read_ready__get_buffer
960 else:
961 self._read_ready_cb = self._read_ready__data_received
963 super().set_protocol(protocol)
965 def _read_ready(self):
966 self._read_ready_cb()
968 def _read_ready__get_buffer(self):
969 if self._conn_lost: 969 ↛ 970line 969 didn't jump to line 970 because the condition on line 969 was never true
970 return
972 try:
973 buf = self._protocol.get_buffer(-1)
974 if not len(buf):
975 raise RuntimeError('get_buffer() returned an empty buffer')
976 except (SystemExit, KeyboardInterrupt):
977 raise
978 except BaseException as exc:
979 self._fatal_error(
980 exc, 'Fatal error: protocol.get_buffer() call failed.')
981 return
983 try:
984 nbytes = self._sock.recv_into(buf)
985 except (BlockingIOError, InterruptedError):
986 return
987 except (SystemExit, KeyboardInterrupt):
988 raise
989 except BaseException as exc:
990 self._fatal_error(exc, 'Fatal read error on socket transport')
991 return
993 if not nbytes:
994 self._read_ready__on_eof()
995 return
997 try:
998 self._protocol.buffer_updated(nbytes)
999 except (SystemExit, KeyboardInterrupt):
1000 raise
1001 except BaseException as exc:
1002 self._fatal_error(
1003 exc, 'Fatal error: protocol.buffer_updated() call failed.')
1005 def _read_ready__data_received(self):
1006 if self._conn_lost: 1006 ↛ 1007line 1006 didn't jump to line 1007 because the condition on line 1006 was never true
1007 return
1008 try:
1009 data = self._sock.recv(self.max_size)
1010 except (BlockingIOError, InterruptedError):
1011 return
1012 except (SystemExit, KeyboardInterrupt):
1013 raise
1014 except BaseException as exc:
1015 self._fatal_error(exc, 'Fatal read error on socket transport')
1016 return
1018 if not data:
1019 self._read_ready__on_eof()
1020 return
1022 try:
1023 self._protocol.data_received(data)
1024 except (SystemExit, KeyboardInterrupt):
1025 raise
1026 except BaseException as exc:
1027 self._fatal_error(
1028 exc, 'Fatal error: protocol.data_received() call failed.')
1030 def _read_ready__on_eof(self):
1031 if self._loop.get_debug():
1032 logger.debug("%r received EOF", self)
1034 try:
1035 keep_open = self._protocol.eof_received()
1036 except (SystemExit, KeyboardInterrupt):
1037 raise
1038 except BaseException as exc:
1039 self._fatal_error(
1040 exc, 'Fatal error: protocol.eof_received() call failed.')
1041 return
1043 if keep_open:
1044 # We're keeping the connection open so the
1045 # protocol can write more, but we still can't
1046 # receive more, so remove the reader callback.
1047 self._loop._remove_reader(self._sock_fd)
1048 else:
1049 self.close()
1051 def write(self, data):
1052 if not isinstance(data, (bytes, bytearray, memoryview)):
1053 raise TypeError(f'data argument must be a bytes-like object, '
1054 f'not {type(data).__name__!r}')
1055 if self._eof: 1055 ↛ 1056line 1055 didn't jump to line 1056 because the condition on line 1055 was never true
1056 raise RuntimeError('Cannot call write() after write_eof()')
1057 if self._empty_waiter is not None:
1058 raise RuntimeError('unable to write; sendfile is in progress')
1059 if not data:
1060 return
1062 if self._conn_lost:
1063 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1064 logger.warning('socket.send() raised exception.')
1065 self._conn_lost += 1
1066 return
1068 if not self._buffer:
1069 # Optimization: try to send now.
1070 try:
1071 n = self._sock.send(data)
1072 except (BlockingIOError, InterruptedError):
1073 pass
1074 except (SystemExit, KeyboardInterrupt):
1075 raise
1076 except BaseException as exc:
1077 self._fatal_error(exc, 'Fatal write error on socket transport')
1078 return
1079 else:
1080 data = memoryview(data)[n:]
1081 if not data:
1082 return
1083 # Not all was written; register write handler.
1084 self._loop._add_writer(self._sock_fd, self._write_ready)
1086 # Add it to the buffer.
1087 self._buffer.append(data)
1088 self._maybe_pause_protocol()
1090 def _get_sendmsg_buffer(self):
1091 return itertools.islice(self._buffer, SC_IOV_MAX)
1093 def _write_sendmsg(self):
1094 assert self._buffer, 'Data should not be empty'
1095 if self._conn_lost: 1095 ↛ 1096line 1095 didn't jump to line 1096 because the condition on line 1095 was never true
1096 return
1097 try:
1098 nbytes = self._sock.sendmsg(self._get_sendmsg_buffer())
1099 self._adjust_leftover_buffer(nbytes)
1100 except (BlockingIOError, InterruptedError):
1101 pass
1102 except (SystemExit, KeyboardInterrupt):
1103 raise
1104 except BaseException as exc:
1105 self._loop._remove_writer(self._sock_fd)
1106 self._buffer.clear()
1107 self._fatal_error(exc, 'Fatal write error on socket transport')
1108 if self._empty_waiter is not None: 1108 ↛ 1109line 1108 didn't jump to line 1109 because the condition on line 1108 was never true
1109 self._empty_waiter.set_exception(exc)
1110 else:
1111 self._maybe_resume_protocol() # May append to buffer.
1112 if not self._buffer:
1113 self._loop._remove_writer(self._sock_fd)
1114 if self._empty_waiter is not None:
1115 self._empty_waiter.set_result(None)
1116 if self._closing:
1117 self._call_connection_lost(None)
1118 elif self._eof: 1118 ↛ 1119line 1118 didn't jump to line 1119 because the condition on line 1118 was never true
1119 self._sock.shutdown(socket.SHUT_WR)
1121 def _adjust_leftover_buffer(self, nbytes: int) -> None:
1122 buffer = self._buffer
1123 while nbytes:
1124 b = buffer.popleft()
1125 b_len = len(b)
1126 if b_len <= nbytes:
1127 nbytes -= b_len
1128 else:
1129 buffer.appendleft(b[nbytes:])
1130 break
1132 def _write_send(self):
1133 assert self._buffer, 'Data should not be empty'
1134 if self._conn_lost: 1134 ↛ 1135line 1134 didn't jump to line 1135 because the condition on line 1134 was never true
1135 return
1136 try:
1137 buffer = self._buffer.popleft()
1138 n = self._sock.send(buffer)
1139 if n != len(buffer):
1140 # Not all data was written
1141 self._buffer.appendleft(buffer[n:])
1142 except (BlockingIOError, InterruptedError):
1143 pass
1144 except (SystemExit, KeyboardInterrupt):
1145 raise
1146 except BaseException as exc:
1147 self._loop._remove_writer(self._sock_fd)
1148 self._buffer.clear()
1149 self._fatal_error(exc, 'Fatal write error on socket transport')
1150 if self._empty_waiter is not None: 1150 ↛ 1151line 1150 didn't jump to line 1151 because the condition on line 1150 was never true
1151 self._empty_waiter.set_exception(exc)
1152 else:
1153 self._maybe_resume_protocol() # May append to buffer.
1154 if not self._buffer:
1155 self._loop._remove_writer(self._sock_fd)
1156 if self._empty_waiter is not None: 1156 ↛ 1157line 1156 didn't jump to line 1157 because the condition on line 1156 was never true
1157 self._empty_waiter.set_result(None)
1158 if self._closing:
1159 self._call_connection_lost(None)
1160 elif self._eof:
1161 self._sock.shutdown(socket.SHUT_WR)
1163 def write_eof(self):
1164 if self._closing or self._eof:
1165 return
1166 self._eof = True
1167 if not self._buffer:
1168 self._sock.shutdown(socket.SHUT_WR)
1170 def writelines(self, list_of_data):
1171 if self._eof: 1171 ↛ 1172line 1171 didn't jump to line 1172 because the condition on line 1171 was never true
1172 raise RuntimeError('Cannot call writelines() after write_eof()')
1173 if self._empty_waiter is not None: 1173 ↛ 1174line 1173 didn't jump to line 1174 because the condition on line 1173 was never true
1174 raise RuntimeError('unable to writelines; sendfile is in progress')
1175 if not list_of_data: 1175 ↛ 1176line 1175 didn't jump to line 1176 because the condition on line 1175 was never true
1176 return
1177 self._buffer.extend([memoryview(data) for data in list_of_data])
1178 self._write_ready()
1179 # If the entire buffer couldn't be written, register a write handler
1180 if self._buffer:
1181 self._loop._add_writer(self._sock_fd, self._write_ready)
1182 self._maybe_pause_protocol()
1184 def can_write_eof(self):
1185 return True
1187 def _call_connection_lost(self, exc):
1188 try:
1189 super()._call_connection_lost(exc)
1190 finally:
1191 self._write_ready = None
1192 if self._empty_waiter is not None: 1192 ↛ 1193line 1192 didn't jump to line 1193 because the condition on line 1192 was never true
1193 self._empty_waiter.set_exception(
1194 ConnectionError("Connection is closed by peer"))
1196 def _make_empty_waiter(self):
1197 if self._empty_waiter is not None: 1197 ↛ 1198line 1197 didn't jump to line 1198 because the condition on line 1197 was never true
1198 raise RuntimeError("Empty waiter is already set")
1199 self._empty_waiter = self._loop.create_future()
1200 if not self._buffer:
1201 self._empty_waiter.set_result(None)
1202 return self._empty_waiter
1204 def _reset_empty_waiter(self):
1205 self._empty_waiter = None
1207 def close(self):
1208 self._read_ready_cb = None
1209 super().close()
1212class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport):
1214 _buffer_factory = collections.deque
1216 def __init__(self, loop, sock, protocol, address=None,
1217 waiter=None, extra=None):
1218 super().__init__(loop, sock, protocol, extra)
1219 self._address = address
1220 self._buffer_size = 0
1221 self._loop.call_soon(self._protocol.connection_made, self)
1222 # only start reading when connection_made() has been called
1223 self._loop.call_soon(self._add_reader,
1224 self._sock_fd, self._read_ready)
1225 if waiter is not None:
1226 # only wake up the waiter when connection_made() has been called
1227 self._loop.call_soon(futures._set_result_unless_cancelled,
1228 waiter, None)
1230 def get_write_buffer_size(self):
1231 return self._buffer_size
1233 def _read_ready(self):
1234 if self._conn_lost: 1234 ↛ 1235line 1234 didn't jump to line 1235 because the condition on line 1234 was never true
1235 return
1236 try:
1237 data, addr = self._sock.recvfrom(self.max_size)
1238 except (BlockingIOError, InterruptedError):
1239 pass
1240 except OSError as exc:
1241 self._protocol.error_received(exc)
1242 except (SystemExit, KeyboardInterrupt):
1243 raise
1244 except BaseException as exc:
1245 self._fatal_error(exc, 'Fatal read error on datagram transport')
1246 else:
1247 self._protocol.datagram_received(data, addr)
1249 def sendto(self, data, addr=None):
1250 if not isinstance(data, (bytes, bytearray, memoryview)):
1251 raise TypeError(f'data argument must be a bytes-like object, '
1252 f'not {type(data).__name__!r}')
1254 if self._address:
1255 if addr not in (None, self._address):
1256 raise ValueError(
1257 f'Invalid address: must be None or {self._address}')
1258 addr = self._address
1260 if self._conn_lost and self._address:
1261 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1262 logger.warning('socket.send() raised exception.')
1263 self._conn_lost += 1
1264 return
1266 if not self._buffer:
1267 # Attempt to send it right away first.
1268 try:
1269 if self._extra['peername']:
1270 self._sock.send(data)
1271 else:
1272 self._sock.sendto(data, addr)
1273 return
1274 except (BlockingIOError, InterruptedError):
1275 self._loop._add_writer(self._sock_fd, self._sendto_ready)
1276 except OSError as exc:
1277 self._protocol.error_received(exc)
1278 return
1279 except (SystemExit, KeyboardInterrupt):
1280 raise
1281 except BaseException as exc:
1282 self._fatal_error(
1283 exc, 'Fatal write error on datagram transport')
1284 return
1286 # Ensure that what we buffer is immutable.
1287 self._buffer.append((bytes(data), addr))
1288 self._buffer_size += len(data) + 8 # include header bytes
1289 self._maybe_pause_protocol()
1291 def _sendto_ready(self):
1292 while self._buffer:
1293 data, addr = self._buffer.popleft()
1294 self._buffer_size -= len(data)
1295 try:
1296 if self._extra['peername']:
1297 self._sock.send(data)
1298 else:
1299 self._sock.sendto(data, addr)
1300 except (BlockingIOError, InterruptedError):
1301 self._buffer.appendleft((data, addr)) # Try again later.
1302 self._buffer_size += len(data)
1303 break
1304 except OSError as exc:
1305 self._protocol.error_received(exc)
1306 return
1307 except (SystemExit, KeyboardInterrupt):
1308 raise
1309 except BaseException as exc:
1310 self._fatal_error(
1311 exc, 'Fatal write error on datagram transport')
1312 return
1314 self._maybe_resume_protocol() # May append to buffer.
1315 if not self._buffer:
1316 self._loop._remove_writer(self._sock_fd)
1317 if self._closing:
1318 self._call_connection_lost(None)