Coverage for Lib/asyncio/unix_events.py: 87%
619 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
1"""Selector event loop for Unix with signal handling."""
3import errno
4import io
5import itertools
6import os
7import selectors
8import signal
9import socket
10import stat
11import subprocess
12import sys
13import threading
14import warnings
16from . import base_events
17from . import base_subprocess
18from . import constants
19from . import coroutines
20from . import events
21from . import exceptions
22from . import futures
23from . import selector_events
24from . import tasks
25from . import transports
26from .log import logger
29__all__ = (
30 'SelectorEventLoop',
31 '_DefaultEventLoopPolicy',
32 'EventLoop',
33)
36if sys.platform == 'win32': # pragma: no cover
37 raise ImportError('Signals are not really supported on Windows')
40def _sighandler_noop(signum, frame):
41 """Dummy signal handler."""
42 pass
45def waitstatus_to_exitcode(status):
46 try:
47 return os.waitstatus_to_exitcode(status)
48 except ValueError:
49 # The child exited, but we don't understand its status.
50 # This shouldn't happen, but if it does, let's just
51 # return that status; perhaps that helps debug it.
52 return status
55class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
56 """Unix event loop.
58 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
59 """
61 def __init__(self, selector=None):
62 super().__init__(selector)
63 self._signal_handlers = {}
64 self._unix_server_sockets = {}
65 if can_use_pidfd():
66 self._watcher = _PidfdChildWatcher()
67 else:
68 self._watcher = _ThreadedChildWatcher()
70 def close(self):
71 super().close()
72 if not sys.is_finalizing():
73 for sig in list(self._signal_handlers):
74 self.remove_signal_handler(sig)
75 else:
76 if self._signal_handlers: 76 ↛ exitline 76 didn't return from function 'close' because the condition on line 76 was always true
77 warnings.warn(f"Closing the loop {self!r} "
78 f"on interpreter shutdown "
79 f"stage, skipping signal handlers removal",
80 ResourceWarning,
81 source=self)
82 self._signal_handlers.clear()
84 def _process_self_data(self, data):
85 for signum in data:
86 if not signum:
87 # ignore null bytes written by _write_to_self()
88 continue
89 self._handle_signal(signum)
91 def add_signal_handler(self, sig, callback, *args):
92 """Add a handler for a signal. UNIX only.
94 Raise ValueError if the signal number is invalid or uncatchable.
95 Raise RuntimeError if there is a problem setting up the handler.
96 """
97 if (coroutines.iscoroutine(callback) or
98 coroutines._iscoroutinefunction(callback)):
99 raise TypeError("coroutines cannot be used "
100 "with add_signal_handler()")
101 self._check_signal(sig)
102 self._check_closed()
103 try:
104 # set_wakeup_fd() raises ValueError if this is not the
105 # main thread. By calling it early we ensure that an
106 # event loop running in another thread cannot add a signal
107 # handler.
108 signal.set_wakeup_fd(self._csock.fileno())
109 except (ValueError, OSError) as exc:
110 raise RuntimeError(str(exc))
112 handle = events.Handle(callback, args, self, None)
113 self._signal_handlers[sig] = handle
115 try:
116 # Register a dummy signal handler to ask Python to write the signal
117 # number in the wakeup file descriptor. _process_self_data() will
118 # read signal numbers from this file descriptor to handle signals.
119 signal.signal(sig, _sighandler_noop)
121 # Set SA_RESTART to limit EINTR occurrences.
122 signal.siginterrupt(sig, False)
123 except OSError as exc:
124 del self._signal_handlers[sig]
125 if not self._signal_handlers:
126 try:
127 signal.set_wakeup_fd(-1)
128 except (ValueError, OSError) as nexc:
129 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
131 if exc.errno == errno.EINVAL:
132 raise RuntimeError(f'sig {sig} cannot be caught')
133 else:
134 raise
136 def _handle_signal(self, sig):
137 """Internal helper that is the actual signal handler."""
138 handle = self._signal_handlers.get(sig)
139 if handle is None:
140 return # Assume it's some race condition.
141 if handle._cancelled:
142 self.remove_signal_handler(sig) # Remove it properly.
143 else:
144 self._add_callback_signalsafe(handle)
146 def remove_signal_handler(self, sig):
147 """Remove a handler for a signal. UNIX only.
149 Return True if a signal handler was removed, False if not.
150 """
151 self._check_signal(sig)
152 try:
153 del self._signal_handlers[sig]
154 except KeyError:
155 return False
157 if sig == signal.SIGINT:
158 handler = signal.default_int_handler
159 else:
160 handler = signal.SIG_DFL
162 try:
163 signal.signal(sig, handler)
164 except OSError as exc:
165 if exc.errno == errno.EINVAL:
166 raise RuntimeError(f'sig {sig} cannot be caught')
167 else:
168 raise
170 if not self._signal_handlers:
171 try:
172 signal.set_wakeup_fd(-1)
173 except (ValueError, OSError) as exc:
174 logger.info('set_wakeup_fd(-1) failed: %s', exc)
176 return True
178 def _check_signal(self, sig):
179 """Internal helper to validate a signal.
181 Raise ValueError if the signal number is invalid or uncatchable.
182 Raise RuntimeError if there is a problem setting up the handler.
183 """
184 if not isinstance(sig, int):
185 raise TypeError(f'sig must be an int, not {sig!r}')
187 if sig not in signal.valid_signals():
188 raise ValueError(f'invalid signal number {sig}')
190 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
191 extra=None):
192 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
194 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
195 extra=None):
196 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
198 async def _make_subprocess_transport(self, protocol, args, shell,
199 stdin, stdout, stderr, bufsize,
200 extra=None, **kwargs):
201 watcher = self._watcher
202 waiter = self.create_future()
203 transp = _UnixSubprocessTransport(self, protocol, args, shell,
204 stdin, stdout, stderr, bufsize,
205 waiter=waiter, extra=extra,
206 **kwargs)
207 watcher.add_child_handler(transp.get_pid(),
208 self._child_watcher_callback, transp)
209 try:
210 await waiter
211 except (SystemExit, KeyboardInterrupt):
212 raise
213 except BaseException:
214 transp.close()
215 await transp._wait()
216 raise
218 return transp
220 def _child_watcher_callback(self, pid, returncode, transp):
221 self.call_soon_threadsafe(transp._process_exited, returncode)
223 async def create_unix_connection(
224 self, protocol_factory, path=None, *,
225 ssl=None, sock=None,
226 server_hostname=None,
227 ssl_handshake_timeout=None,
228 ssl_shutdown_timeout=None):
229 assert server_hostname is None or isinstance(server_hostname, str)
230 if ssl:
231 if server_hostname is None:
232 raise ValueError(
233 'you have to pass server_hostname when using ssl')
234 else:
235 if server_hostname is not None:
236 raise ValueError('server_hostname is only meaningful with ssl')
237 if ssl_handshake_timeout is not None:
238 raise ValueError(
239 'ssl_handshake_timeout is only meaningful with ssl')
240 if ssl_shutdown_timeout is not None: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 raise ValueError(
242 'ssl_shutdown_timeout is only meaningful with ssl')
244 if path is not None:
245 if sock is not None:
246 raise ValueError(
247 'path and sock can not be specified at the same time')
249 path = os.fspath(path)
250 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
251 try:
252 sock.setblocking(False)
253 await self.sock_connect(sock, path)
254 except:
255 sock.close()
256 raise
258 else:
259 if sock is None:
260 raise ValueError('no path and sock were specified')
261 if (sock.family != socket.AF_UNIX or 261 ↛ 265line 261 didn't jump to line 265 because the condition on line 261 was always true
262 sock.type != socket.SOCK_STREAM):
263 raise ValueError(
264 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
265 sock.setblocking(False)
267 transport, protocol = await self._create_connection_transport(
268 sock, protocol_factory, ssl, server_hostname,
269 ssl_handshake_timeout=ssl_handshake_timeout,
270 ssl_shutdown_timeout=ssl_shutdown_timeout)
271 return transport, protocol
273 async def create_unix_server(
274 self, protocol_factory, path=None, *,
275 sock=None, backlog=100, ssl=None,
276 ssl_handshake_timeout=None,
277 ssl_shutdown_timeout=None,
278 start_serving=True, cleanup_socket=True):
279 if isinstance(ssl, bool):
280 raise TypeError('ssl argument must be an SSLContext or None')
282 if ssl_handshake_timeout is not None and not ssl:
283 raise ValueError(
284 'ssl_handshake_timeout is only meaningful with ssl')
286 if ssl_shutdown_timeout is not None and not ssl: 286 ↛ 287line 286 didn't jump to line 287 because the condition on line 286 was never true
287 raise ValueError(
288 'ssl_shutdown_timeout is only meaningful with ssl')
290 if path is not None:
291 if sock is not None:
292 raise ValueError(
293 'path and sock can not be specified at the same time')
295 path = os.fspath(path)
296 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
298 # Check for abstract socket. `str` and `bytes` paths are supported.
299 if path[0] not in (0, '\x00'): 299 ↛ 310line 299 didn't jump to line 310 because the condition on line 299 was always true
300 try:
301 if stat.S_ISSOCK(os.stat(path).st_mode):
302 os.remove(path)
303 except FileNotFoundError:
304 pass
305 except OSError as err:
306 # Directory may have permissions only to create socket.
307 logger.error('Unable to check or remove stale UNIX socket '
308 '%r: %r', path, err)
310 try:
311 sock.bind(path)
312 except OSError as exc:
313 sock.close()
314 if exc.errno == errno.EADDRINUSE:
315 # Let's improve the error message by adding
316 # with what exact address it occurs.
317 msg = f'Address {path!r} is already in use'
318 raise OSError(errno.EADDRINUSE, msg) from None
319 else:
320 raise
321 except:
322 sock.close()
323 raise
324 else:
325 if sock is None:
326 raise ValueError(
327 'path was not specified, and no sock specified')
329 if (sock.family != socket.AF_UNIX or
330 sock.type != socket.SOCK_STREAM):
331 raise ValueError(
332 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
334 if cleanup_socket:
335 path = sock.getsockname()
336 # Check for abstract socket. `str` and `bytes` paths are supported.
337 if path[0] not in (0, '\x00'): 337 ↛ 343line 337 didn't jump to line 343 because the condition on line 337 was always true
338 try:
339 self._unix_server_sockets[sock] = os.stat(path).st_ino
340 except FileNotFoundError:
341 pass
343 sock.setblocking(False)
344 server = base_events.Server(self, [sock], protocol_factory,
345 ssl, backlog, ssl_handshake_timeout,
346 ssl_shutdown_timeout)
347 if start_serving:
348 server._start_serving()
349 # Skip one loop iteration so that all 'loop.add_reader'
350 # go through.
351 await tasks.sleep(0)
353 return server
355 async def _sock_sendfile_native(self, sock, file, offset, count):
356 try:
357 os.sendfile
358 except AttributeError:
359 raise exceptions.SendfileNotAvailableError(
360 "os.sendfile() is not available")
361 try:
362 fileno = file.fileno()
363 except (AttributeError, io.UnsupportedOperation) as err:
364 raise exceptions.SendfileNotAvailableError("not a regular file")
365 try:
366 fsize = os.fstat(fileno).st_size
367 except OSError:
368 raise exceptions.SendfileNotAvailableError("not a regular file")
369 blocksize = count if count else fsize
370 if not blocksize:
371 return 0 # empty file
373 fut = self.create_future()
374 self._sock_sendfile_native_impl(fut, None, sock, fileno,
375 offset, count, blocksize, 0)
376 return await fut
378 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
379 offset, count, blocksize, total_sent):
380 fd = sock.fileno()
381 if registered_fd is not None:
382 # Remove the callback early. It should be rare that the
383 # selector says the fd is ready but the call still returns
384 # EAGAIN, and I am willing to take a hit in that case in
385 # order to simplify the common case.
386 self.remove_writer(registered_fd)
387 if fut.cancelled():
388 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
389 return
390 if count:
391 blocksize = count - total_sent
392 if blocksize <= 0:
393 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
394 fut.set_result(total_sent)
395 return
397 # On 32-bit architectures truncate to 1GiB to avoid OverflowError
398 blocksize = min(blocksize, sys.maxsize//2 + 1)
400 try:
401 sent = os.sendfile(fd, fileno, offset, blocksize)
402 except (BlockingIOError, InterruptedError):
403 if registered_fd is None: 403 ↛ 405line 403 didn't jump to line 405 because the condition on line 403 was always true
404 self._sock_add_cancellation_callback(fut, sock)
405 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
406 fd, sock, fileno,
407 offset, count, blocksize, total_sent)
408 except OSError as exc:
409 if (registered_fd is not None and 409 ↛ 416line 409 didn't jump to line 416 because the condition on line 409 was never true
410 exc.errno == errno.ENOTCONN and
411 type(exc) is not ConnectionError):
412 # If we have an ENOTCONN and this isn't a first call to
413 # sendfile(), i.e. the connection was closed in the middle
414 # of the operation, normalize the error to ConnectionError
415 # to make it consistent across all Posix systems.
416 new_exc = ConnectionError(
417 "socket is not connected", errno.ENOTCONN)
418 new_exc.__cause__ = exc
419 exc = new_exc
420 if total_sent == 0:
421 # We can get here for different reasons, the main
422 # one being 'file' is not a regular mmap(2)-like
423 # file, in which case we'll fall back on using
424 # plain send().
425 err = exceptions.SendfileNotAvailableError(
426 "os.sendfile call failed")
427 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
428 fut.set_exception(err)
429 else:
430 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
431 fut.set_exception(exc)
432 except (SystemExit, KeyboardInterrupt):
433 raise
434 except BaseException as exc:
435 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
436 fut.set_exception(exc)
437 else:
438 if sent == 0:
439 # EOF
440 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
441 fut.set_result(total_sent)
442 else:
443 offset += sent
444 total_sent += sent
445 if registered_fd is None:
446 self._sock_add_cancellation_callback(fut, sock)
447 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
448 fd, sock, fileno,
449 offset, count, blocksize, total_sent)
451 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
452 if total_sent > 0:
453 os.lseek(fileno, offset, os.SEEK_SET)
455 def _sock_add_cancellation_callback(self, fut, sock):
456 def cb(fut):
457 if fut.cancelled():
458 fd = sock.fileno()
459 if fd != -1: 459 ↛ exitline 459 didn't return from function 'cb' because the condition on line 459 was always true
460 self.remove_writer(fd)
461 fut.add_done_callback(cb)
463 def _stop_serving(self, sock):
464 # Is this a unix socket that needs cleanup?
465 if sock in self._unix_server_sockets:
466 path = sock.getsockname()
467 else:
468 path = None
470 super()._stop_serving(sock)
472 if path is not None:
473 prev_ino = self._unix_server_sockets[sock]
474 del self._unix_server_sockets[sock]
475 try:
476 if os.stat(path).st_ino == prev_ino:
477 os.unlink(path)
478 except FileNotFoundError:
479 pass
480 except OSError as err:
481 logger.error('Unable to clean up listening UNIX socket '
482 '%r: %r', path, err)
485class _UnixReadPipeTransport(transports.ReadTransport):
487 max_size = 256 * 1024 # max bytes we read in one event loop iteration
489 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
490 super().__init__(extra)
491 self._extra['pipe'] = pipe
492 self._loop = loop
493 self._pipe = pipe
494 self._fileno = pipe.fileno()
495 self._protocol = protocol
496 self._closing = False
497 self._paused = False
499 mode = os.fstat(self._fileno).st_mode
500 if not (stat.S_ISFIFO(mode) or 500 ↛ 503line 500 didn't jump to line 503 because the condition on line 500 was never true
501 stat.S_ISSOCK(mode) or
502 stat.S_ISCHR(mode)):
503 self._pipe = None
504 self._fileno = None
505 self._protocol = None
506 raise ValueError("Pipe transport is for pipes/sockets only.")
508 os.set_blocking(self._fileno, False)
510 self._loop.call_soon(self._protocol.connection_made, self)
511 # only start reading when connection_made() has been called
512 self._loop.call_soon(self._add_reader,
513 self._fileno, self._read_ready)
514 if waiter is not None:
515 # only wake up the waiter when connection_made() has been called
516 self._loop.call_soon(futures._set_result_unless_cancelled,
517 waiter, None)
519 def _add_reader(self, fd, callback):
520 if not self.is_reading():
521 return
522 self._loop._add_reader(fd, callback)
524 def is_reading(self):
525 return not self._paused and not self._closing
527 def __repr__(self):
528 info = [self.__class__.__name__]
529 if self._pipe is None: 529 ↛ 530line 529 didn't jump to line 530 because the condition on line 529 was never true
530 info.append('closed')
531 elif self._closing: 531 ↛ 532line 531 didn't jump to line 532 because the condition on line 531 was never true
532 info.append('closing')
533 info.append(f'fd={self._fileno}')
534 selector = getattr(self._loop, '_selector', None)
535 if self._pipe is not None and selector is not None:
536 polling = selector_events._test_selector_event(
537 selector, self._fileno, selectors.EVENT_READ)
538 if polling: 538 ↛ 539line 538 didn't jump to line 539 because the condition on line 538 was never true
539 info.append('polling')
540 else:
541 info.append('idle')
542 elif self._pipe is not None: 542 ↛ 545line 542 didn't jump to line 545 because the condition on line 542 was always true
543 info.append('open')
544 else:
545 info.append('closed')
546 return '<{}>'.format(' '.join(info))
548 def _read_ready(self):
549 try:
550 data = os.read(self._fileno, self.max_size)
551 except (BlockingIOError, InterruptedError):
552 pass
553 except OSError as exc:
554 self._fatal_error(exc, 'Fatal read error on pipe transport')
555 else:
556 if data:
557 self._protocol.data_received(data)
558 else:
559 if self._loop.get_debug(): 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true
560 logger.info("%r was closed by peer", self)
561 self._closing = True
562 self._loop._remove_reader(self._fileno)
563 self._loop.call_soon(self._protocol.eof_received)
564 self._loop.call_soon(self._call_connection_lost, None)
566 def pause_reading(self):
567 if not self.is_reading():
568 return
569 self._paused = True
570 self._loop._remove_reader(self._fileno)
571 if self._loop.get_debug(): 571 ↛ 572line 571 didn't jump to line 572 because the condition on line 571 was never true
572 logger.debug("%r pauses reading", self)
574 def resume_reading(self):
575 if self._closing or not self._paused:
576 return
577 self._paused = False
578 self._loop._add_reader(self._fileno, self._read_ready)
579 if self._loop.get_debug(): 579 ↛ 580line 579 didn't jump to line 580 because the condition on line 579 was never true
580 logger.debug("%r resumes reading", self)
582 def set_protocol(self, protocol):
583 self._protocol = protocol
585 def get_protocol(self):
586 return self._protocol
588 def is_closing(self):
589 return self._closing
591 def close(self):
592 if not self._closing:
593 self._close(None)
595 def __del__(self, _warn=warnings.warn):
596 if self._pipe is not None: 596 ↛ 597line 596 didn't jump to line 597 because the condition on line 596 was never true
597 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
598 self._pipe.close()
600 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
601 # should be called by exception handler only
602 if (isinstance(exc, OSError) and exc.errno == errno.EIO): 602 ↛ 603line 602 didn't jump to line 603 because the condition on line 602 was never true
603 if self._loop.get_debug():
604 logger.debug("%r: %s", self, message, exc_info=True)
605 else:
606 self._loop.call_exception_handler({
607 'message': message,
608 'exception': exc,
609 'transport': self,
610 'protocol': self._protocol,
611 })
612 self._close(exc)
614 def _close(self, exc):
615 self._closing = True
616 self._loop._remove_reader(self._fileno)
617 self._loop.call_soon(self._call_connection_lost, exc)
619 def _call_connection_lost(self, exc):
620 try:
621 self._protocol.connection_lost(exc)
622 finally:
623 self._pipe.close()
624 self._pipe = None
625 self._protocol = None
626 self._loop = None
629class _UnixWritePipeTransport(transports._FlowControlMixin,
630 transports.WriteTransport):
632 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
633 super().__init__(extra, loop)
634 self._extra['pipe'] = pipe
635 self._pipe = pipe
636 self._fileno = pipe.fileno()
637 self._protocol = protocol
638 self._buffer = bytearray()
639 self._conn_lost = 0
640 self._closing = False # Set when close() or write_eof() called.
642 mode = os.fstat(self._fileno).st_mode
643 is_char = stat.S_ISCHR(mode)
644 is_fifo = stat.S_ISFIFO(mode)
645 is_socket = stat.S_ISSOCK(mode)
646 if not (is_char or is_fifo or is_socket): 646 ↛ 647line 646 didn't jump to line 647 because the condition on line 646 was never true
647 self._pipe = None
648 self._fileno = None
649 self._protocol = None
650 raise ValueError("Pipe transport is only for "
651 "pipes, sockets and character devices")
653 os.set_blocking(self._fileno, False)
654 self._loop.call_soon(self._protocol.connection_made, self)
656 # On AIX, the reader trick (to be notified when the read end of the
657 # socket is closed) only works for sockets. On other platforms it
658 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
659 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
660 # only start reading when connection_made() has been called
661 self._loop.call_soon(self._loop._add_reader,
662 self._fileno, self._read_ready)
664 if waiter is not None:
665 # only wake up the waiter when connection_made() has been called
666 self._loop.call_soon(futures._set_result_unless_cancelled,
667 waiter, None)
669 def __repr__(self):
670 info = [self.__class__.__name__]
671 if self._pipe is None: 671 ↛ 672line 671 didn't jump to line 672 because the condition on line 671 was never true
672 info.append('closed')
673 elif self._closing: 673 ↛ 674line 673 didn't jump to line 674 because the condition on line 673 was never true
674 info.append('closing')
675 info.append(f'fd={self._fileno}')
676 selector = getattr(self._loop, '_selector', None)
677 if self._pipe is not None and selector is not None: 677 ↛ 678line 677 didn't jump to line 678 because the condition on line 677 was never true
678 polling = selector_events._test_selector_event(
679 selector, self._fileno, selectors.EVENT_WRITE)
680 if polling:
681 info.append('polling')
682 else:
683 info.append('idle')
685 bufsize = self.get_write_buffer_size()
686 info.append(f'bufsize={bufsize}')
687 elif self._pipe is not None: 687 ↛ 690line 687 didn't jump to line 690 because the condition on line 687 was always true
688 info.append('open')
689 else:
690 info.append('closed')
691 return '<{}>'.format(' '.join(info))
693 def get_write_buffer_size(self):
694 return len(self._buffer)
696 def _read_ready(self):
697 # Pipe was closed by peer.
698 if self._loop.get_debug(): 698 ↛ 699line 698 didn't jump to line 699 because the condition on line 698 was never true
699 logger.info("%r was closed by peer", self)
700 if self._buffer:
701 self._close(BrokenPipeError())
702 else:
703 self._close()
705 def write(self, data):
706 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
707 if isinstance(data, bytearray): 707 ↛ 708line 707 didn't jump to line 708 because the condition on line 707 was never true
708 data = memoryview(data)
709 if not data:
710 return
712 if self._conn_lost or self._closing:
713 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
714 logger.warning('pipe closed by peer or '
715 'os.write(pipe, data) raised exception.')
716 self._conn_lost += 1
717 return
719 if not self._buffer:
720 # Attempt to send it right away first.
721 try:
722 n = os.write(self._fileno, data)
723 except (BlockingIOError, InterruptedError):
724 n = 0
725 except (SystemExit, KeyboardInterrupt):
726 raise
727 except BaseException as exc:
728 self._conn_lost += 1
729 self._fatal_error(exc, 'Fatal write error on pipe transport')
730 return
731 if n == len(data):
732 return
733 elif n > 0:
734 data = memoryview(data)[n:]
735 self._loop._add_writer(self._fileno, self._write_ready)
737 self._buffer += data
738 self._maybe_pause_protocol()
740 def _write_ready(self):
741 assert self._buffer, 'Data should not be empty'
743 try:
744 n = os.write(self._fileno, self._buffer)
745 except (BlockingIOError, InterruptedError):
746 pass
747 except (SystemExit, KeyboardInterrupt):
748 raise
749 except BaseException as exc:
750 self._buffer.clear()
751 self._conn_lost += 1
752 # Remove writer here, _fatal_error() doesn't it
753 # because _buffer is empty.
754 self._loop._remove_writer(self._fileno)
755 self._fatal_error(exc, 'Fatal write error on pipe transport')
756 else:
757 if n == len(self._buffer):
758 self._buffer.clear()
759 self._loop._remove_writer(self._fileno)
760 self._maybe_resume_protocol() # May append to buffer.
761 if self._closing:
762 self._loop._remove_reader(self._fileno)
763 self._call_connection_lost(None)
764 return
765 elif n > 0:
766 del self._buffer[:n]
768 def can_write_eof(self):
769 return True
771 def write_eof(self):
772 if self._closing: 772 ↛ 773line 772 didn't jump to line 773 because the condition on line 772 was never true
773 return
774 assert self._pipe
775 self._closing = True
776 if not self._buffer:
777 self._loop._remove_reader(self._fileno)
778 self._loop.call_soon(self._call_connection_lost, None)
780 def set_protocol(self, protocol):
781 self._protocol = protocol
783 def get_protocol(self):
784 return self._protocol
786 def is_closing(self):
787 return self._closing
789 def close(self):
790 if self._pipe is not None and not self._closing:
791 # write_eof is all what we needed to close the write pipe
792 self.write_eof()
794 def __del__(self, _warn=warnings.warn):
795 if self._pipe is not None: 795 ↛ 796line 795 didn't jump to line 796 because the condition on line 795 was never true
796 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
797 self._pipe.close()
799 def abort(self):
800 self._close(None)
802 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
803 # should be called by exception handler only
804 if isinstance(exc, OSError): 804 ↛ 808line 804 didn't jump to line 808 because the condition on line 804 was always true
805 if self._loop.get_debug(): 805 ↛ 806line 805 didn't jump to line 806 because the condition on line 805 was never true
806 logger.debug("%r: %s", self, message, exc_info=True)
807 else:
808 self._loop.call_exception_handler({
809 'message': message,
810 'exception': exc,
811 'transport': self,
812 'protocol': self._protocol,
813 })
814 self._close(exc)
816 def _close(self, exc=None):
817 self._closing = True
818 if self._buffer:
819 self._loop._remove_writer(self._fileno)
820 self._buffer.clear()
821 self._loop._remove_reader(self._fileno)
822 self._loop.call_soon(self._call_connection_lost, exc)
824 def _call_connection_lost(self, exc):
825 try:
826 self._protocol.connection_lost(exc)
827 finally:
828 self._pipe.close()
829 self._pipe = None
830 self._protocol = None
831 self._loop = None
834class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
836 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
837 stdin_w = None
838 if stdin == subprocess.PIPE and sys.platform.startswith('aix'): 838 ↛ 843line 838 didn't jump to line 843 because the condition on line 838 was never true
839 # Use a socket pair for stdin on AIX, since it does not
840 # support selecting read events on the write end of a
841 # socket (which we use in order to detect closing of the
842 # other end).
843 stdin, stdin_w = socket.socketpair()
844 try:
845 self._proc = subprocess.Popen(
846 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
847 universal_newlines=False, bufsize=bufsize, **kwargs)
848 if stdin_w is not None: 848 ↛ 849line 848 didn't jump to line 849 because the condition on line 848 was never true
849 stdin.close()
850 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
851 stdin_w = None
852 finally:
853 if stdin_w is not None: 853 ↛ 854line 853 didn't jump to line 854 because the condition on line 853 was never true
854 stdin.close()
855 stdin_w.close()
858class _PidfdChildWatcher:
859 """Child watcher implementation using Linux's pid file descriptors.
861 This child watcher polls process file descriptors (pidfds) to await child
862 process termination. In some respects, PidfdChildWatcher is a "Goldilocks"
863 child watcher implementation. It doesn't require signals or threads, doesn't
864 interfere with any processes launched outside the event loop, and scales
865 linearly with the number of subprocesses launched by the event loop. The
866 main disadvantage is that pidfds are specific to Linux, and only work on
867 recent (5.3+) kernels.
868 """
870 def add_child_handler(self, pid, callback, *args):
871 loop = events.get_running_loop()
872 pidfd = os.pidfd_open(pid)
873 loop._add_reader(pidfd, self._do_wait, pid, pidfd, callback, args)
875 def _do_wait(self, pid, pidfd, callback, args):
876 loop = events.get_running_loop()
877 loop._remove_reader(pidfd)
878 try:
879 _, status = os.waitpid(pid, 0)
880 except ChildProcessError:
881 # The child process is already reaped
882 # (may happen if waitpid() is called elsewhere).
883 returncode = 255
884 logger.warning(
885 "child process pid %d exit status already read: "
886 " will report returncode 255",
887 pid)
888 else:
889 returncode = waitstatus_to_exitcode(status)
891 os.close(pidfd)
892 callback(pid, returncode, *args)
894class _ThreadedChildWatcher:
895 """Threaded child watcher implementation.
897 The watcher uses a thread per process
898 for waiting for the process finish.
900 It doesn't require subscription on POSIX signal
901 but a thread creation is not free.
903 The watcher has O(1) complexity, its performance doesn't depend
904 on amount of spawn processes.
905 """
907 def __init__(self):
908 self._pid_counter = itertools.count(0)
909 self._threads = {}
911 def __del__(self, _warn=warnings.warn):
912 threads = [thread for thread in list(self._threads.values())
913 if thread.is_alive()]
914 if threads: 914 ↛ 915line 914 didn't jump to line 915 because the condition on line 914 was never true
915 _warn(f"{self.__class__} has registered but not finished child processes",
916 ResourceWarning,
917 source=self)
919 def add_child_handler(self, pid, callback, *args):
920 loop = events.get_running_loop()
921 thread = threading.Thread(target=self._do_waitpid,
922 name=f"asyncio-waitpid-{next(self._pid_counter)}",
923 args=(loop, pid, callback, args),
924 daemon=True)
925 self._threads[pid] = thread
926 thread.start()
928 def _do_waitpid(self, loop, expected_pid, callback, args):
929 assert expected_pid > 0
931 try:
932 pid, status = os.waitpid(expected_pid, 0)
933 except ChildProcessError:
934 # The child process is already reaped
935 # (may happen if waitpid() is called elsewhere).
936 pid = expected_pid
937 returncode = 255
938 logger.warning(
939 "Unknown child process pid %d, will report returncode 255",
940 pid)
941 else:
942 returncode = waitstatus_to_exitcode(status)
943 if loop.get_debug(): 943 ↛ 944line 943 didn't jump to line 944 because the condition on line 943 was never true
944 logger.debug('process %s exited with returncode %s',
945 expected_pid, returncode)
947 if loop.is_closed(): 947 ↛ 948line 947 didn't jump to line 948 because the condition on line 947 was never true
948 logger.warning("Loop %r that handles pid %r is closed", loop, pid)
949 else:
950 loop.call_soon_threadsafe(callback, pid, returncode, *args)
952 self._threads.pop(expected_pid)
954def can_use_pidfd():
955 if not hasattr(os, 'pidfd_open'): 955 ↛ 956line 955 didn't jump to line 956 because the condition on line 955 was never true
956 return False
957 try:
958 pid = os.getpid()
959 os.close(os.pidfd_open(pid, 0))
960 except OSError:
961 # blocked by security policy like SECCOMP
962 return False
963 return True
966class _UnixDefaultEventLoopPolicy(events._BaseDefaultEventLoopPolicy):
967 """UNIX event loop policy"""
968 _loop_factory = _UnixSelectorEventLoop
971SelectorEventLoop = _UnixSelectorEventLoop
972_DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
973EventLoop = SelectorEventLoop