Coverage for Lib/asyncio/unix_events.py: 87%
619 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:28 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:28 +0000
1"""Selector event loop for Unix with signal handling."""
3import errno
4import io
5import itertools
6import os
7import selectors
8import signal
9import socket
10import stat
11import subprocess
12import sys
13import threading
14import warnings
15import inspect
17from . import base_events
18from . import base_subprocess
19from . import constants
20from . import coroutines
21from . import events
22from . import exceptions
23from . import futures
24from . import selector_events
25from . import tasks
26from . import transports
27from .log import logger
30__all__ = (
31 'SelectorEventLoop',
32 'EventLoop',
33)
36if sys.platform == 'win32': # pragma: no cover
37 raise ImportError('Signals are not really supported on Windows')
40def _sighandler_noop(signum, frame):
41 """Dummy signal handler."""
42 pass
45def waitstatus_to_exitcode(status):
46 try:
47 return os.waitstatus_to_exitcode(status)
48 except ValueError:
49 # The child exited, but we don't understand its status.
50 # This shouldn't happen, but if it does, let's just
51 # return that status; perhaps that helps debug it.
52 return status
55class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
56 """Unix event loop.
58 Adds signal handling and UNIX Domain Socket support to
59 SelectorEventLoop.
60 """
62 def __init__(self, selector=None):
63 super().__init__(selector)
64 self._signal_handlers = {}
65 self._unix_server_sockets = {}
66 if can_use_pidfd():
67 self._watcher = _PidfdChildWatcher()
68 else:
69 self._watcher = _ThreadedChildWatcher()
71 def close(self):
72 super().close()
73 if not sys.is_finalizing():
74 for sig in list(self._signal_handlers):
75 self.remove_signal_handler(sig)
76 else:
77 if self._signal_handlers: 77 ↛ exitline 77 didn't return from function 'close' because the condition on line 77 was always true
78 warnings.warn(f"Closing the loop {self!r} "
79 f"on interpreter shutdown "
80 f"stage, skipping signal handlers removal",
81 ResourceWarning,
82 source=self)
83 self._signal_handlers.clear()
85 def _process_self_data(self, data):
86 for signum in data:
87 if not signum:
88 # ignore null bytes written by _write_to_self()
89 continue
90 self._handle_signal(signum)
92 def add_signal_handler(self, sig, callback, *args):
93 """Add a handler for a signal. UNIX only.
95 Raise ValueError if the signal number is invalid or uncatchable.
96 Raise RuntimeError if there is a problem setting up the handler.
97 """
98 if (coroutines.iscoroutine(callback) or
99 inspect.iscoroutinefunction(callback)):
100 raise TypeError("coroutines cannot be used "
101 "with add_signal_handler()")
102 self._check_signal(sig)
103 self._check_closed()
104 try:
105 # set_wakeup_fd() raises ValueError if this is not the
106 # main thread. By calling it early we ensure that an
107 # event loop running in another thread cannot add a signal
108 # handler.
109 signal.set_wakeup_fd(self._csock.fileno())
110 except (ValueError, OSError) as exc:
111 raise RuntimeError(str(exc))
113 handle = events.Handle(callback, args, self, None)
114 self._signal_handlers[sig] = handle
116 try:
117 # Register a dummy signal handler to ask Python to write the signal
118 # number in the wakeup file descriptor. _process_self_data() will
119 # read signal numbers from this file descriptor to handle signals.
120 signal.signal(sig, _sighandler_noop)
122 # Set SA_RESTART to limit EINTR occurrences.
123 signal.siginterrupt(sig, False)
124 except OSError as exc:
125 del self._signal_handlers[sig]
126 if not self._signal_handlers:
127 try:
128 signal.set_wakeup_fd(-1)
129 except (ValueError, OSError) as nexc:
130 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
132 if exc.errno == errno.EINVAL:
133 raise RuntimeError(f'sig {sig} cannot be caught')
134 else:
135 raise
137 def _handle_signal(self, sig):
138 """Internal helper that is the actual signal handler."""
139 handle = self._signal_handlers.get(sig)
140 if handle is None:
141 return # Assume it's some race condition.
142 if handle._cancelled:
143 self.remove_signal_handler(sig) # Remove it properly.
144 else:
145 self._add_callback_signalsafe(handle)
147 def remove_signal_handler(self, sig):
148 """Remove a handler for a signal. UNIX only.
150 Return True if a signal handler was removed, False if not.
151 """
152 self._check_signal(sig)
153 try:
154 del self._signal_handlers[sig]
155 except KeyError:
156 return False
158 if sig == signal.SIGINT:
159 handler = signal.default_int_handler
160 else:
161 handler = signal.SIG_DFL
163 try:
164 signal.signal(sig, handler)
165 except OSError as exc:
166 if exc.errno == errno.EINVAL:
167 raise RuntimeError(f'sig {sig} cannot be caught')
168 else:
169 raise
171 if not self._signal_handlers:
172 try:
173 signal.set_wakeup_fd(-1)
174 except (ValueError, OSError) as exc:
175 logger.info('set_wakeup_fd(-1) failed: %s', exc)
177 return True
179 def _check_signal(self, sig):
180 """Internal helper to validate a signal.
182 Raise ValueError if the signal number is invalid or uncatchable.
183 Raise RuntimeError if there is a problem setting up the handler.
184 """
185 if not isinstance(sig, int):
186 raise TypeError(f'sig must be an int, not {sig!r}')
188 if sig not in signal.valid_signals():
189 raise ValueError(f'invalid signal number {sig}')
191 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
192 extra=None):
193 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
195 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
196 extra=None):
197 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
199 async def _make_subprocess_transport(self, protocol, args, shell,
200 stdin, stdout, stderr, bufsize,
201 extra=None, **kwargs):
202 watcher = self._watcher
203 waiter = self.create_future()
204 transp = _UnixSubprocessTransport(self, protocol, args, shell,
205 stdin, stdout, stderr, bufsize,
206 waiter=waiter, extra=extra,
207 **kwargs)
208 watcher.add_child_handler(transp.get_pid(),
209 self._child_watcher_callback, transp)
210 try:
211 await waiter
212 except (SystemExit, KeyboardInterrupt):
213 raise
214 except BaseException:
215 transp.close()
216 await transp._wait()
217 raise
219 return transp
221 def _child_watcher_callback(self, pid, returncode, transp):
222 self.call_soon_threadsafe(transp._process_exited, returncode)
224 async def create_unix_connection(
225 self, protocol_factory, path=None, *,
226 ssl=None, sock=None,
227 server_hostname=None,
228 ssl_handshake_timeout=None,
229 ssl_shutdown_timeout=None):
230 assert server_hostname is None or isinstance(server_hostname, str)
231 if ssl:
232 if server_hostname is None:
233 raise ValueError(
234 'you have to pass server_hostname when using ssl')
235 else:
236 if server_hostname is not None:
237 raise ValueError('server_hostname is only meaningful with ssl')
238 if ssl_handshake_timeout is not None:
239 raise ValueError(
240 'ssl_handshake_timeout is only meaningful with ssl')
241 if ssl_shutdown_timeout is not None: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true
242 raise ValueError(
243 'ssl_shutdown_timeout is only meaningful with ssl')
245 if path is not None:
246 if sock is not None:
247 raise ValueError(
248 'path and sock can not be specified at the same time')
250 path = os.fspath(path)
251 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
252 try:
253 sock.setblocking(False)
254 await self.sock_connect(sock, path)
255 except:
256 sock.close()
257 raise
259 else:
260 if sock is None:
261 raise ValueError('no path and sock were specified')
262 if (sock.family != socket.AF_UNIX or 262 ↛ 266line 262 didn't jump to line 266 because the condition on line 262 was always true
263 sock.type != socket.SOCK_STREAM):
264 raise ValueError(
265 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
266 sock.setblocking(False)
268 transport, protocol = await self._create_connection_transport(
269 sock, protocol_factory, ssl, server_hostname,
270 ssl_handshake_timeout=ssl_handshake_timeout,
271 ssl_shutdown_timeout=ssl_shutdown_timeout)
272 return transport, protocol
274 async def create_unix_server(
275 self, protocol_factory, path=None, *,
276 sock=None, backlog=100, ssl=None,
277 ssl_handshake_timeout=None,
278 ssl_shutdown_timeout=None,
279 start_serving=True, cleanup_socket=True):
280 if isinstance(ssl, bool):
281 raise TypeError('ssl argument must be an SSLContext or None')
283 if ssl_handshake_timeout is not None and not ssl:
284 raise ValueError(
285 'ssl_handshake_timeout is only meaningful with ssl')
287 if ssl_shutdown_timeout is not None and not ssl: 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true
288 raise ValueError(
289 'ssl_shutdown_timeout is only meaningful with ssl')
291 if path is not None:
292 if sock is not None:
293 raise ValueError(
294 'path and sock can not be specified at the same time')
296 path = os.fspath(path)
297 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
299 # Check for abstract socket. `str` and `bytes` paths are supported.
300 if path[0] not in (0, '\x00'): 300 ↛ 311line 300 didn't jump to line 311 because the condition on line 300 was always true
301 try:
302 if stat.S_ISSOCK(os.stat(path).st_mode):
303 os.remove(path)
304 except FileNotFoundError:
305 pass
306 except OSError as err:
307 # Directory may have permissions only to create socket.
308 logger.error('Unable to check or remove stale UNIX socket '
309 '%r: %r', path, err)
311 try:
312 sock.bind(path)
313 except OSError as exc:
314 sock.close()
315 if exc.errno == errno.EADDRINUSE:
316 # Let's improve the error message by adding
317 # with what exact address it occurs.
318 msg = f'Address {path!r} is already in use'
319 raise OSError(errno.EADDRINUSE, msg) from None
320 else:
321 raise
322 except:
323 sock.close()
324 raise
325 else:
326 if sock is None:
327 raise ValueError(
328 'path was not specified, and no sock specified')
330 if (sock.family != socket.AF_UNIX or
331 sock.type != socket.SOCK_STREAM):
332 raise ValueError(
333 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
335 if cleanup_socket:
336 path = sock.getsockname()
337 # Check for abstract socket. `str` and `bytes` paths are supported.
338 if path[0] not in (0, '\x00'): 338 ↛ 344line 338 didn't jump to line 344 because the condition on line 338 was always true
339 try:
340 self._unix_server_sockets[sock] = os.stat(path).st_ino
341 except FileNotFoundError:
342 pass
344 sock.setblocking(False)
345 server = base_events.Server(self, [sock], protocol_factory,
346 ssl, backlog, ssl_handshake_timeout,
347 ssl_shutdown_timeout)
348 if start_serving:
349 server._start_serving()
350 # Skip one loop iteration so that all 'loop.add_reader'
351 # go through.
352 await tasks.sleep(0)
354 return server
356 async def _sock_sendfile_native(self, sock, file, offset, count):
357 try:
358 os.sendfile
359 except AttributeError:
360 raise exceptions.SendfileNotAvailableError(
361 "os.sendfile() is not available")
362 try:
363 fileno = file.fileno()
364 except (AttributeError, io.UnsupportedOperation):
365 raise exceptions.SendfileNotAvailableError("not a regular file")
366 try:
367 fsize = os.fstat(fileno).st_size
368 except OSError:
369 raise exceptions.SendfileNotAvailableError("not a regular file")
370 blocksize = count if count else fsize
371 if not blocksize:
372 return 0 # empty file
374 fut = self.create_future()
375 self._sock_sendfile_native_impl(fut, None, sock, fileno,
376 offset, count, blocksize, 0)
377 return await fut
379 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
380 offset, count, blocksize, total_sent):
381 fd = sock.fileno()
382 if registered_fd is not None:
383 # Remove the callback early. It should be rare that the
384 # selector says the fd is ready but the call still returns
385 # EAGAIN, and I am willing to take a hit in that case in
386 # order to simplify the common case.
387 self.remove_writer(registered_fd)
388 if fut.cancelled():
389 self._sock_sendfile_update_filepos(fileno, offset)
390 return
391 if count:
392 blocksize = count - total_sent
393 if blocksize <= 0:
394 self._sock_sendfile_update_filepos(fileno, offset)
395 fut.set_result(total_sent)
396 return
398 # On 32-bit architectures truncate to 1GiB to avoid OverflowError
399 blocksize = min(blocksize, sys.maxsize//2 + 1)
401 try:
402 sent = os.sendfile(fd, fileno, offset, blocksize)
403 except (BlockingIOError, InterruptedError):
404 if registered_fd is None: 404 ↛ 406line 404 didn't jump to line 406 because the condition on line 404 was always true
405 self._sock_add_cancellation_callback(fut, sock)
406 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
407 fd, sock, fileno,
408 offset, count, blocksize, total_sent)
409 except OSError as exc:
410 if (registered_fd is not None and 410 ↛ 417line 410 didn't jump to line 417 because the condition on line 410 was never true
411 exc.errno == errno.ENOTCONN and
412 type(exc) is not ConnectionError):
413 # If we have an ENOTCONN and this isn't a first call to
414 # sendfile(), i.e. the connection was closed in the middle
415 # of the operation, normalize the error to ConnectionError
416 # to make it consistent across all Posix systems.
417 new_exc = ConnectionError(
418 "socket is not connected", errno.ENOTCONN)
419 new_exc.__cause__ = exc
420 exc = new_exc
421 if total_sent == 0:
422 # We can get here for different reasons, the main
423 # one being 'file' is not a regular mmap(2)-like
424 # file, in which case we'll fall back on using
425 # plain send().
426 err = exceptions.SendfileNotAvailableError(
427 "os.sendfile call failed")
428 self._sock_sendfile_update_filepos(fileno, offset)
429 fut.set_exception(err)
430 else:
431 self._sock_sendfile_update_filepos(fileno, offset)
432 fut.set_exception(exc)
433 except (SystemExit, KeyboardInterrupt):
434 raise
435 except BaseException as exc:
436 self._sock_sendfile_update_filepos(fileno, offset)
437 fut.set_exception(exc)
438 else:
439 if sent == 0:
440 # EOF
441 self._sock_sendfile_update_filepos(fileno, offset)
442 fut.set_result(total_sent)
443 else:
444 offset += sent
445 total_sent += sent
446 if registered_fd is None:
447 self._sock_add_cancellation_callback(fut, sock)
448 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
449 fd, sock, fileno,
450 offset, count, blocksize, total_sent)
452 def _sock_sendfile_update_filepos(self, fileno, offset):
453 # After this helper runs, the source fd's lseek pointer is at offset."
454 os.lseek(fileno, offset, os.SEEK_SET)
456 def _sock_add_cancellation_callback(self, fut, sock):
457 def cb(fut):
458 if fut.cancelled():
459 fd = sock.fileno()
460 if fd != -1: 460 ↛ exitline 460 didn't return from function 'cb' because the condition on line 460 was always true
461 self.remove_writer(fd)
462 fut.add_done_callback(cb)
464 def _stop_serving(self, sock):
465 # Is this a unix socket that needs cleanup?
466 if sock in self._unix_server_sockets:
467 path = sock.getsockname()
468 else:
469 path = None
471 super()._stop_serving(sock)
473 if path is not None:
474 prev_ino = self._unix_server_sockets[sock]
475 del self._unix_server_sockets[sock]
476 try:
477 if os.stat(path).st_ino == prev_ino:
478 os.unlink(path)
479 except FileNotFoundError:
480 pass
481 except OSError as err:
482 logger.error('Unable to clean up listening UNIX socket '
483 '%r: %r', path, err)
486class _UnixReadPipeTransport(transports.ReadTransport):
488 max_size = 256 * 1024 # max bytes we read in one event loop iteration
490 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
491 super().__init__(extra)
492 self._extra['pipe'] = pipe
493 self._loop = loop
494 self._pipe = pipe
495 self._fileno = pipe.fileno()
496 self._protocol = protocol
497 self._closing = False
498 self._paused = False
500 mode = os.fstat(self._fileno).st_mode
501 if not (stat.S_ISFIFO(mode) or 501 ↛ 504line 501 didn't jump to line 504 because the condition on line 501 was never true
502 stat.S_ISSOCK(mode) or
503 stat.S_ISCHR(mode)):
504 self._pipe = None
505 self._fileno = None
506 self._protocol = None
507 raise ValueError("Pipe transport is for pipes/sockets only.")
509 os.set_blocking(self._fileno, False)
511 self._loop.call_soon(self._protocol.connection_made, self)
512 # only start reading when connection_made() has been called
513 self._loop.call_soon(self._add_reader,
514 self._fileno, self._read_ready)
515 if waiter is not None:
516 # only wake up the waiter when connection_made() has been called
517 self._loop.call_soon(futures._set_result_unless_cancelled,
518 waiter, None)
520 def _add_reader(self, fd, callback):
521 if not self.is_reading():
522 return
523 self._loop._add_reader(fd, callback)
525 def is_reading(self):
526 return not self._paused and not self._closing
528 def __repr__(self):
529 info = [self.__class__.__name__]
530 if self._pipe is None: 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true
531 info.append('closed')
532 elif self._closing: 532 ↛ 533line 532 didn't jump to line 533 because the condition on line 532 was never true
533 info.append('closing')
534 info.append(f'fd={self._fileno}')
535 selector = getattr(self._loop, '_selector', None)
536 if self._pipe is not None and selector is not None:
537 polling = selector_events._test_selector_event(
538 selector, self._fileno, selectors.EVENT_READ)
539 if polling: 539 ↛ 540line 539 didn't jump to line 540 because the condition on line 539 was never true
540 info.append('polling')
541 else:
542 info.append('idle')
543 elif self._pipe is not None: 543 ↛ 546line 543 didn't jump to line 546 because the condition on line 543 was always true
544 info.append('open')
545 else:
546 info.append('closed')
547 return '<{}>'.format(' '.join(info))
549 def _read_ready(self):
550 try:
551 data = os.read(self._fileno, self.max_size)
552 except (BlockingIOError, InterruptedError):
553 pass
554 except OSError as exc:
555 self._fatal_error(exc, 'Fatal read error on pipe transport')
556 else:
557 if data:
558 self._protocol.data_received(data)
559 else:
560 if self._loop.get_debug(): 560 ↛ 561line 560 didn't jump to line 561 because the condition on line 560 was never true
561 logger.info("%r was closed by peer", self)
562 self._closing = True
563 self._loop._remove_reader(self._fileno)
564 self._loop.call_soon(self._protocol.eof_received)
565 self._loop.call_soon(self._call_connection_lost, None)
567 def pause_reading(self):
568 if not self.is_reading():
569 return
570 self._paused = True
571 self._loop._remove_reader(self._fileno)
572 if self._loop.get_debug(): 572 ↛ 573line 572 didn't jump to line 573 because the condition on line 572 was never true
573 logger.debug("%r pauses reading", self)
575 def resume_reading(self):
576 if self._closing or not self._paused:
577 return
578 self._paused = False
579 self._loop._add_reader(self._fileno, self._read_ready)
580 if self._loop.get_debug(): 580 ↛ 581line 580 didn't jump to line 581 because the condition on line 580 was never true
581 logger.debug("%r resumes reading", self)
583 def set_protocol(self, protocol):
584 self._protocol = protocol
586 def get_protocol(self):
587 return self._protocol
589 def is_closing(self):
590 return self._closing
592 def close(self):
593 if not self._closing:
594 self._close(None)
596 def __del__(self, _warn=warnings.warn):
597 if self._pipe is not None: 597 ↛ 598line 597 didn't jump to line 598 because the condition on line 597 was never true
598 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
599 self._pipe.close()
601 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
602 # should be called by exception handler only
603 if (isinstance(exc, OSError) and exc.errno == errno.EIO): 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true
604 if self._loop.get_debug():
605 logger.debug("%r: %s", self, message, exc_info=True)
606 else:
607 self._loop.call_exception_handler({
608 'message': message,
609 'exception': exc,
610 'transport': self,
611 'protocol': self._protocol,
612 })
613 self._close(exc)
615 def _close(self, exc):
616 self._closing = True
617 self._loop._remove_reader(self._fileno)
618 self._loop.call_soon(self._call_connection_lost, exc)
620 def _call_connection_lost(self, exc):
621 try:
622 self._protocol.connection_lost(exc)
623 finally:
624 self._pipe.close()
625 self._pipe = None
626 self._protocol = None
627 self._loop = None
630class _UnixWritePipeTransport(transports._FlowControlMixin,
631 transports.WriteTransport):
633 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
634 super().__init__(extra, loop)
635 self._extra['pipe'] = pipe
636 self._pipe = pipe
637 self._fileno = pipe.fileno()
638 self._protocol = protocol
639 self._buffer = bytearray()
640 self._conn_lost = 0
641 self._closing = False # Set when close() or write_eof() called.
643 mode = os.fstat(self._fileno).st_mode
644 is_char = stat.S_ISCHR(mode)
645 is_fifo = stat.S_ISFIFO(mode)
646 is_socket = stat.S_ISSOCK(mode)
647 if not (is_char or is_fifo or is_socket): 647 ↛ 648line 647 didn't jump to line 648 because the condition on line 647 was never true
648 self._pipe = None
649 self._fileno = None
650 self._protocol = None
651 raise ValueError("Pipe transport is only for "
652 "pipes, sockets and character devices")
654 os.set_blocking(self._fileno, False)
655 self._loop.call_soon(self._protocol.connection_made, self)
657 # On AIX, the reader trick (to be notified when the read end of the
658 # socket is closed) only works for sockets. On other platforms it
659 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
660 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
661 # only start reading when connection_made() has been called
662 self._loop.call_soon(self._loop._add_reader,
663 self._fileno, self._read_ready)
665 if waiter is not None:
666 # only wake up the waiter when connection_made() has been called
667 self._loop.call_soon(futures._set_result_unless_cancelled,
668 waiter, None)
670 def __repr__(self):
671 info = [self.__class__.__name__]
672 if self._pipe is None: 672 ↛ 673line 672 didn't jump to line 673 because the condition on line 672 was never true
673 info.append('closed')
674 elif self._closing: 674 ↛ 675line 674 didn't jump to line 675 because the condition on line 674 was never true
675 info.append('closing')
676 info.append(f'fd={self._fileno}')
677 selector = getattr(self._loop, '_selector', None)
678 if self._pipe is not None and selector is not None: 678 ↛ 679line 678 didn't jump to line 679 because the condition on line 678 was never true
679 polling = selector_events._test_selector_event(
680 selector, self._fileno, selectors.EVENT_WRITE)
681 if polling:
682 info.append('polling')
683 else:
684 info.append('idle')
686 bufsize = self.get_write_buffer_size()
687 info.append(f'bufsize={bufsize}')
688 elif self._pipe is not None: 688 ↛ 691line 688 didn't jump to line 691 because the condition on line 688 was always true
689 info.append('open')
690 else:
691 info.append('closed')
692 return '<{}>'.format(' '.join(info))
694 def get_write_buffer_size(self):
695 return len(self._buffer)
697 def _read_ready(self):
698 # Pipe was closed by peer.
699 if self._loop.get_debug(): 699 ↛ 700line 699 didn't jump to line 700 because the condition on line 699 was never true
700 logger.info("%r was closed by peer", self)
701 if self._buffer:
702 self._close(BrokenPipeError())
703 else:
704 self._close()
706 def write(self, data):
707 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
708 if isinstance(data, bytearray): 708 ↛ 709line 708 didn't jump to line 709 because the condition on line 708 was never true
709 data = memoryview(data)
710 if not data:
711 return
713 if self._conn_lost or self._closing:
714 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
715 logger.warning('pipe closed by peer or '
716 'os.write(pipe, data) raised exception.')
717 self._conn_lost += 1
718 return
720 if not self._buffer:
721 # Attempt to send it right away first.
722 try:
723 n = os.write(self._fileno, data)
724 except (BlockingIOError, InterruptedError):
725 n = 0
726 except (SystemExit, KeyboardInterrupt):
727 raise
728 except BaseException as exc:
729 self._conn_lost += 1
730 self._fatal_error(exc, 'Fatal write error on pipe transport')
731 return
732 if n == len(data):
733 return
734 elif n > 0:
735 data = memoryview(data)[n:]
736 self._loop._add_writer(self._fileno, self._write_ready)
738 self._buffer += data
739 self._maybe_pause_protocol()
741 def _write_ready(self):
742 assert self._buffer, 'Data should not be empty'
744 try:
745 n = os.write(self._fileno, self._buffer)
746 except (BlockingIOError, InterruptedError):
747 pass
748 except (SystemExit, KeyboardInterrupt):
749 raise
750 except BaseException as exc:
751 self._buffer.clear()
752 self._conn_lost += 1
753 # Remove writer here, _fatal_error() doesn't it
754 # because _buffer is empty.
755 self._loop._remove_writer(self._fileno)
756 self._fatal_error(exc, 'Fatal write error on pipe transport')
757 else:
758 if n == len(self._buffer):
759 self._buffer.clear()
760 self._loop._remove_writer(self._fileno)
761 self._maybe_resume_protocol() # May append to buffer.
762 if self._closing:
763 self._loop._remove_reader(self._fileno)
764 self._call_connection_lost(None)
765 return
766 elif n > 0:
767 del self._buffer[:n]
769 def can_write_eof(self):
770 return True
772 def write_eof(self):
773 if self._closing: 773 ↛ 774line 773 didn't jump to line 774 because the condition on line 773 was never true
774 return
775 assert self._pipe
776 self._closing = True
777 if not self._buffer:
778 self._loop._remove_reader(self._fileno)
779 self._loop.call_soon(self._call_connection_lost, None)
781 def set_protocol(self, protocol):
782 self._protocol = protocol
784 def get_protocol(self):
785 return self._protocol
787 def is_closing(self):
788 return self._closing
790 def close(self):
791 if self._pipe is not None and not self._closing:
792 # write_eof is all what we needed to close the write pipe
793 self.write_eof()
795 def __del__(self, _warn=warnings.warn):
796 if self._pipe is not None: 796 ↛ 797line 796 didn't jump to line 797 because the condition on line 796 was never true
797 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
798 self._pipe.close()
800 def abort(self):
801 self._close(None)
803 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
804 # should be called by exception handler only
805 if isinstance(exc, OSError): 805 ↛ 809line 805 didn't jump to line 809 because the condition on line 805 was always true
806 if self._loop.get_debug(): 806 ↛ 807line 806 didn't jump to line 807 because the condition on line 806 was never true
807 logger.debug("%r: %s", self, message, exc_info=True)
808 else:
809 self._loop.call_exception_handler({
810 'message': message,
811 'exception': exc,
812 'transport': self,
813 'protocol': self._protocol,
814 })
815 self._close(exc)
817 def _close(self, exc=None):
818 self._closing = True
819 if self._buffer:
820 self._loop._remove_writer(self._fileno)
821 self._buffer.clear()
822 self._loop._remove_reader(self._fileno)
823 self._loop.call_soon(self._call_connection_lost, exc)
825 def _call_connection_lost(self, exc):
826 try:
827 self._protocol.connection_lost(exc)
828 finally:
829 self._pipe.close()
830 self._pipe = None
831 self._protocol = None
832 self._loop = None
835class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
837 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
838 stdin_w = None
839 if (stdin == subprocess.PIPE 839 ↛ 845line 839 didn't jump to line 845 because the condition on line 839 was never true
840 and (sys.platform.startswith('aix') or sys.platform == 'cygwin')):
841 # Use a socket pair for stdin on AIX, since it does not
842 # support selecting read events on the write end of a
843 # socket (which we use in order to detect closing of the
844 # other end).
845 stdin, stdin_w = socket.socketpair()
846 try:
847 self._proc = subprocess.Popen(
848 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
849 universal_newlines=False, bufsize=bufsize, **kwargs)
850 if stdin_w is not None: 850 ↛ 851line 850 didn't jump to line 851 because the condition on line 850 was never true
851 stdin.close()
852 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
853 stdin_w = None
854 finally:
855 if stdin_w is not None: 855 ↛ 856line 855 didn't jump to line 856 because the condition on line 855 was never true
856 stdin.close()
857 stdin_w.close()
860class _PidfdChildWatcher:
861 """Child watcher implementation using Linux's pid file descriptors.
863 This child watcher polls process file descriptors (pidfds) to await child
864 process termination. In some respects, PidfdChildWatcher is a "Goldilocks"
865 child watcher implementation. It doesn't require signals or threads, doesn't
866 interfere with any processes launched outside the event loop, and scales
867 linearly with the number of subprocesses launched by the event loop. The
868 main disadvantage is that pidfds are specific to Linux, and only work on
869 recent (5.3+) kernels.
870 """
872 def add_child_handler(self, pid, callback, *args):
873 loop = events.get_running_loop()
874 pidfd = os.pidfd_open(pid)
875 loop._add_reader(pidfd, self._do_wait, pid, pidfd, callback, args)
877 def _do_wait(self, pid, pidfd, callback, args):
878 loop = events.get_running_loop()
879 loop._remove_reader(pidfd)
880 try:
881 _, status = os.waitpid(pid, 0)
882 except ChildProcessError:
883 # The child process is already reaped
884 # (may happen if waitpid() is called elsewhere).
885 returncode = 255
886 logger.warning(
887 "child process pid %d exit status already read: "
888 " will report returncode 255",
889 pid)
890 else:
891 returncode = waitstatus_to_exitcode(status)
892 finally:
893 os.close(pidfd)
894 callback(pid, returncode, *args)
896class _ThreadedChildWatcher:
897 """Threaded child watcher implementation.
899 The watcher uses a thread per process
900 for waiting for the process finish.
902 It doesn't require subscription on POSIX signal
903 but a thread creation is not free.
905 The watcher has O(1) complexity, its performance doesn't depend
906 on amount of spawn processes.
907 """
909 def __init__(self):
910 self._pid_counter = itertools.count(0)
911 self._threads = {}
913 def __del__(self, _warn=warnings.warn):
914 threads = [thread for thread in list(self._threads.values())
915 if thread.is_alive()]
916 if threads: 916 ↛ 917line 916 didn't jump to line 917 because the condition on line 916 was never true
917 _warn(f"{self.__class__} has registered but not finished child processes",
918 ResourceWarning,
919 source=self)
921 def add_child_handler(self, pid, callback, *args):
922 loop = events.get_running_loop()
923 thread = threading.Thread(target=self._do_waitpid,
924 name=f"asyncio-waitpid-{next(self._pid_counter)}",
925 args=(loop, pid, callback, args),
926 daemon=True)
927 self._threads[pid] = thread
928 thread.start()
930 def _do_waitpid(self, loop, expected_pid, callback, args):
931 assert expected_pid > 0
933 try:
934 pid, status = os.waitpid(expected_pid, 0)
935 except ChildProcessError:
936 # The child process is already reaped
937 # (may happen if waitpid() is called elsewhere).
938 pid = expected_pid
939 returncode = 255
940 logger.warning(
941 "Unknown child process pid %d, will report returncode 255",
942 pid)
943 else:
944 returncode = waitstatus_to_exitcode(status)
945 if loop.get_debug(): 945 ↛ 946line 945 didn't jump to line 946 because the condition on line 945 was never true
946 logger.debug('process %s exited with returncode %s',
947 expected_pid, returncode)
949 if loop.is_closed(): 949 ↛ 950line 949 didn't jump to line 950 because the condition on line 949 was never true
950 logger.warning("Loop %r that handles pid %r is closed", loop, pid)
951 else:
952 loop.call_soon_threadsafe(callback, pid, returncode, *args)
954 self._threads.pop(expected_pid)
956def can_use_pidfd():
957 if not hasattr(os, 'pidfd_open'): 957 ↛ 958line 957 didn't jump to line 958 because the condition on line 957 was never true
958 return False
959 try:
960 pid = os.getpid()
961 os.close(os.pidfd_open(pid, 0))
962 except OSError:
963 # blocked by security policy like SECCOMP
964 return False
965 return True
968class _UnixDefaultEventLoopPolicy(events._BaseDefaultEventLoopPolicy):
969 """UNIX event loop policy"""
970 _loop_factory = _UnixSelectorEventLoop
973SelectorEventLoop = _UnixSelectorEventLoop
974_DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
975EventLoop = SelectorEventLoop