Coverage for Lib/asyncio/unix_events.py: 87%
619 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-23 01:21 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-23 01:21 +0000
1"""Selector event loop for Unix with signal handling."""
3import errno
4import io
5import itertools
6import os
7import selectors
8import signal
9import socket
10import stat
11import subprocess
12import sys
13import threading
14import warnings
16from . import base_events
17from . import base_subprocess
18from . import constants
19from . import coroutines
20from . import events
21from . import exceptions
22from . import futures
23from . import selector_events
24from . import tasks
25from . import transports
26from .log import logger
29__all__ = (
30 'SelectorEventLoop',
31 'EventLoop',
32)
35if sys.platform == 'win32': # pragma: no cover
36 raise ImportError('Signals are not really supported on Windows')
39def _sighandler_noop(signum, frame):
40 """Dummy signal handler."""
41 pass
44def waitstatus_to_exitcode(status):
45 try:
46 return os.waitstatus_to_exitcode(status)
47 except ValueError:
48 # The child exited, but we don't understand its status.
49 # This shouldn't happen, but if it does, let's just
50 # return that status; perhaps that helps debug it.
51 return status
54class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
55 """Unix event loop.
57 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
58 """
60 def __init__(self, selector=None):
61 super().__init__(selector)
62 self._signal_handlers = {}
63 self._unix_server_sockets = {}
64 if can_use_pidfd():
65 self._watcher = _PidfdChildWatcher()
66 else:
67 self._watcher = _ThreadedChildWatcher()
69 def close(self):
70 super().close()
71 if not sys.is_finalizing():
72 for sig in list(self._signal_handlers):
73 self.remove_signal_handler(sig)
74 else:
75 if self._signal_handlers: 75 ↛ exitline 75 didn't return from function 'close' because the condition on line 75 was always true
76 warnings.warn(f"Closing the loop {self!r} "
77 f"on interpreter shutdown "
78 f"stage, skipping signal handlers removal",
79 ResourceWarning,
80 source=self)
81 self._signal_handlers.clear()
83 def _process_self_data(self, data):
84 for signum in data:
85 if not signum:
86 # ignore null bytes written by _write_to_self()
87 continue
88 self._handle_signal(signum)
90 def add_signal_handler(self, sig, callback, *args):
91 """Add a handler for a signal. UNIX only.
93 Raise ValueError if the signal number is invalid or uncatchable.
94 Raise RuntimeError if there is a problem setting up the handler.
95 """
96 if (coroutines.iscoroutine(callback) or
97 coroutines._iscoroutinefunction(callback)):
98 raise TypeError("coroutines cannot be used "
99 "with add_signal_handler()")
100 self._check_signal(sig)
101 self._check_closed()
102 try:
103 # set_wakeup_fd() raises ValueError if this is not the
104 # main thread. By calling it early we ensure that an
105 # event loop running in another thread cannot add a signal
106 # handler.
107 signal.set_wakeup_fd(self._csock.fileno())
108 except (ValueError, OSError) as exc:
109 raise RuntimeError(str(exc))
111 handle = events.Handle(callback, args, self, None)
112 self._signal_handlers[sig] = handle
114 try:
115 # Register a dummy signal handler to ask Python to write the signal
116 # number in the wakeup file descriptor. _process_self_data() will
117 # read signal numbers from this file descriptor to handle signals.
118 signal.signal(sig, _sighandler_noop)
120 # Set SA_RESTART to limit EINTR occurrences.
121 signal.siginterrupt(sig, False)
122 except OSError as exc:
123 del self._signal_handlers[sig]
124 if not self._signal_handlers:
125 try:
126 signal.set_wakeup_fd(-1)
127 except (ValueError, OSError) as nexc:
128 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
130 if exc.errno == errno.EINVAL:
131 raise RuntimeError(f'sig {sig} cannot be caught')
132 else:
133 raise
135 def _handle_signal(self, sig):
136 """Internal helper that is the actual signal handler."""
137 handle = self._signal_handlers.get(sig)
138 if handle is None:
139 return # Assume it's some race condition.
140 if handle._cancelled:
141 self.remove_signal_handler(sig) # Remove it properly.
142 else:
143 self._add_callback_signalsafe(handle)
145 def remove_signal_handler(self, sig):
146 """Remove a handler for a signal. UNIX only.
148 Return True if a signal handler was removed, False if not.
149 """
150 self._check_signal(sig)
151 try:
152 del self._signal_handlers[sig]
153 except KeyError:
154 return False
156 if sig == signal.SIGINT:
157 handler = signal.default_int_handler
158 else:
159 handler = signal.SIG_DFL
161 try:
162 signal.signal(sig, handler)
163 except OSError as exc:
164 if exc.errno == errno.EINVAL:
165 raise RuntimeError(f'sig {sig} cannot be caught')
166 else:
167 raise
169 if not self._signal_handlers:
170 try:
171 signal.set_wakeup_fd(-1)
172 except (ValueError, OSError) as exc:
173 logger.info('set_wakeup_fd(-1) failed: %s', exc)
175 return True
177 def _check_signal(self, sig):
178 """Internal helper to validate a signal.
180 Raise ValueError if the signal number is invalid or uncatchable.
181 Raise RuntimeError if there is a problem setting up the handler.
182 """
183 if not isinstance(sig, int):
184 raise TypeError(f'sig must be an int, not {sig!r}')
186 if sig not in signal.valid_signals():
187 raise ValueError(f'invalid signal number {sig}')
189 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
190 extra=None):
191 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
193 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
194 extra=None):
195 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
197 async def _make_subprocess_transport(self, protocol, args, shell,
198 stdin, stdout, stderr, bufsize,
199 extra=None, **kwargs):
200 watcher = self._watcher
201 waiter = self.create_future()
202 transp = _UnixSubprocessTransport(self, protocol, args, shell,
203 stdin, stdout, stderr, bufsize,
204 waiter=waiter, extra=extra,
205 **kwargs)
206 watcher.add_child_handler(transp.get_pid(),
207 self._child_watcher_callback, transp)
208 try:
209 await waiter
210 except (SystemExit, KeyboardInterrupt):
211 raise
212 except BaseException:
213 transp.close()
214 await transp._wait()
215 raise
217 return transp
219 def _child_watcher_callback(self, pid, returncode, transp):
220 self.call_soon_threadsafe(transp._process_exited, returncode)
222 async def create_unix_connection(
223 self, protocol_factory, path=None, *,
224 ssl=None, sock=None,
225 server_hostname=None,
226 ssl_handshake_timeout=None,
227 ssl_shutdown_timeout=None):
228 assert server_hostname is None or isinstance(server_hostname, str)
229 if ssl:
230 if server_hostname is None:
231 raise ValueError(
232 'you have to pass server_hostname when using ssl')
233 else:
234 if server_hostname is not None:
235 raise ValueError('server_hostname is only meaningful with ssl')
236 if ssl_handshake_timeout is not None:
237 raise ValueError(
238 'ssl_handshake_timeout is only meaningful with ssl')
239 if ssl_shutdown_timeout is not None: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true
240 raise ValueError(
241 'ssl_shutdown_timeout is only meaningful with ssl')
243 if path is not None:
244 if sock is not None:
245 raise ValueError(
246 'path and sock can not be specified at the same time')
248 path = os.fspath(path)
249 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
250 try:
251 sock.setblocking(False)
252 await self.sock_connect(sock, path)
253 except:
254 sock.close()
255 raise
257 else:
258 if sock is None:
259 raise ValueError('no path and sock were specified')
260 if (sock.family != socket.AF_UNIX or 260 ↛ 264line 260 didn't jump to line 264 because the condition on line 260 was always true
261 sock.type != socket.SOCK_STREAM):
262 raise ValueError(
263 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
264 sock.setblocking(False)
266 transport, protocol = await self._create_connection_transport(
267 sock, protocol_factory, ssl, server_hostname,
268 ssl_handshake_timeout=ssl_handshake_timeout,
269 ssl_shutdown_timeout=ssl_shutdown_timeout)
270 return transport, protocol
272 async def create_unix_server(
273 self, protocol_factory, path=None, *,
274 sock=None, backlog=100, ssl=None,
275 ssl_handshake_timeout=None,
276 ssl_shutdown_timeout=None,
277 start_serving=True, cleanup_socket=True):
278 if isinstance(ssl, bool):
279 raise TypeError('ssl argument must be an SSLContext or None')
281 if ssl_handshake_timeout is not None and not ssl:
282 raise ValueError(
283 'ssl_handshake_timeout is only meaningful with ssl')
285 if ssl_shutdown_timeout is not None and not ssl: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true
286 raise ValueError(
287 'ssl_shutdown_timeout is only meaningful with ssl')
289 if path is not None:
290 if sock is not None:
291 raise ValueError(
292 'path and sock can not be specified at the same time')
294 path = os.fspath(path)
295 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
297 # Check for abstract socket. `str` and `bytes` paths are supported.
298 if path[0] not in (0, '\x00'): 298 ↛ 309line 298 didn't jump to line 309 because the condition on line 298 was always true
299 try:
300 if stat.S_ISSOCK(os.stat(path).st_mode):
301 os.remove(path)
302 except FileNotFoundError:
303 pass
304 except OSError as err:
305 # Directory may have permissions only to create socket.
306 logger.error('Unable to check or remove stale UNIX socket '
307 '%r: %r', path, err)
309 try:
310 sock.bind(path)
311 except OSError as exc:
312 sock.close()
313 if exc.errno == errno.EADDRINUSE:
314 # Let's improve the error message by adding
315 # with what exact address it occurs.
316 msg = f'Address {path!r} is already in use'
317 raise OSError(errno.EADDRINUSE, msg) from None
318 else:
319 raise
320 except:
321 sock.close()
322 raise
323 else:
324 if sock is None:
325 raise ValueError(
326 'path was not specified, and no sock specified')
328 if (sock.family != socket.AF_UNIX or
329 sock.type != socket.SOCK_STREAM):
330 raise ValueError(
331 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
333 if cleanup_socket:
334 path = sock.getsockname()
335 # Check for abstract socket. `str` and `bytes` paths are supported.
336 if path[0] not in (0, '\x00'): 336 ↛ 342line 336 didn't jump to line 342 because the condition on line 336 was always true
337 try:
338 self._unix_server_sockets[sock] = os.stat(path).st_ino
339 except FileNotFoundError:
340 pass
342 sock.setblocking(False)
343 server = base_events.Server(self, [sock], protocol_factory,
344 ssl, backlog, ssl_handshake_timeout,
345 ssl_shutdown_timeout)
346 if start_serving:
347 server._start_serving()
348 # Skip one loop iteration so that all 'loop.add_reader'
349 # go through.
350 await tasks.sleep(0)
352 return server
354 async def _sock_sendfile_native(self, sock, file, offset, count):
355 try:
356 os.sendfile
357 except AttributeError:
358 raise exceptions.SendfileNotAvailableError(
359 "os.sendfile() is not available")
360 try:
361 fileno = file.fileno()
362 except (AttributeError, io.UnsupportedOperation) as err:
363 raise exceptions.SendfileNotAvailableError("not a regular file")
364 try:
365 fsize = os.fstat(fileno).st_size
366 except OSError:
367 raise exceptions.SendfileNotAvailableError("not a regular file")
368 blocksize = count if count else fsize
369 if not blocksize:
370 return 0 # empty file
372 fut = self.create_future()
373 self._sock_sendfile_native_impl(fut, None, sock, fileno,
374 offset, count, blocksize, 0)
375 return await fut
377 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
378 offset, count, blocksize, total_sent):
379 fd = sock.fileno()
380 if registered_fd is not None:
381 # Remove the callback early. It should be rare that the
382 # selector says the fd is ready but the call still returns
383 # EAGAIN, and I am willing to take a hit in that case in
384 # order to simplify the common case.
385 self.remove_writer(registered_fd)
386 if fut.cancelled():
387 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
388 return
389 if count:
390 blocksize = count - total_sent
391 if blocksize <= 0:
392 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
393 fut.set_result(total_sent)
394 return
396 # On 32-bit architectures truncate to 1GiB to avoid OverflowError
397 blocksize = min(blocksize, sys.maxsize//2 + 1)
399 try:
400 sent = os.sendfile(fd, fileno, offset, blocksize)
401 except (BlockingIOError, InterruptedError):
402 if registered_fd is None: 402 ↛ 404line 402 didn't jump to line 404 because the condition on line 402 was always true
403 self._sock_add_cancellation_callback(fut, sock)
404 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
405 fd, sock, fileno,
406 offset, count, blocksize, total_sent)
407 except OSError as exc:
408 if (registered_fd is not None and 408 ↛ 415line 408 didn't jump to line 415 because the condition on line 408 was never true
409 exc.errno == errno.ENOTCONN and
410 type(exc) is not ConnectionError):
411 # If we have an ENOTCONN and this isn't a first call to
412 # sendfile(), i.e. the connection was closed in the middle
413 # of the operation, normalize the error to ConnectionError
414 # to make it consistent across all Posix systems.
415 new_exc = ConnectionError(
416 "socket is not connected", errno.ENOTCONN)
417 new_exc.__cause__ = exc
418 exc = new_exc
419 if total_sent == 0:
420 # We can get here for different reasons, the main
421 # one being 'file' is not a regular mmap(2)-like
422 # file, in which case we'll fall back on using
423 # plain send().
424 err = exceptions.SendfileNotAvailableError(
425 "os.sendfile call failed")
426 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
427 fut.set_exception(err)
428 else:
429 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
430 fut.set_exception(exc)
431 except (SystemExit, KeyboardInterrupt):
432 raise
433 except BaseException as exc:
434 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
435 fut.set_exception(exc)
436 else:
437 if sent == 0:
438 # EOF
439 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
440 fut.set_result(total_sent)
441 else:
442 offset += sent
443 total_sent += sent
444 if registered_fd is None:
445 self._sock_add_cancellation_callback(fut, sock)
446 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
447 fd, sock, fileno,
448 offset, count, blocksize, total_sent)
450 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
451 if total_sent > 0:
452 os.lseek(fileno, offset, os.SEEK_SET)
454 def _sock_add_cancellation_callback(self, fut, sock):
455 def cb(fut):
456 if fut.cancelled():
457 fd = sock.fileno()
458 if fd != -1: 458 ↛ exitline 458 didn't return from function 'cb' because the condition on line 458 was always true
459 self.remove_writer(fd)
460 fut.add_done_callback(cb)
462 def _stop_serving(self, sock):
463 # Is this a unix socket that needs cleanup?
464 if sock in self._unix_server_sockets:
465 path = sock.getsockname()
466 else:
467 path = None
469 super()._stop_serving(sock)
471 if path is not None:
472 prev_ino = self._unix_server_sockets[sock]
473 del self._unix_server_sockets[sock]
474 try:
475 if os.stat(path).st_ino == prev_ino:
476 os.unlink(path)
477 except FileNotFoundError:
478 pass
479 except OSError as err:
480 logger.error('Unable to clean up listening UNIX socket '
481 '%r: %r', path, err)
484class _UnixReadPipeTransport(transports.ReadTransport):
486 max_size = 256 * 1024 # max bytes we read in one event loop iteration
488 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
489 super().__init__(extra)
490 self._extra['pipe'] = pipe
491 self._loop = loop
492 self._pipe = pipe
493 self._fileno = pipe.fileno()
494 self._protocol = protocol
495 self._closing = False
496 self._paused = False
498 mode = os.fstat(self._fileno).st_mode
499 if not (stat.S_ISFIFO(mode) or 499 ↛ 502line 499 didn't jump to line 502 because the condition on line 499 was never true
500 stat.S_ISSOCK(mode) or
501 stat.S_ISCHR(mode)):
502 self._pipe = None
503 self._fileno = None
504 self._protocol = None
505 raise ValueError("Pipe transport is for pipes/sockets only.")
507 os.set_blocking(self._fileno, False)
509 self._loop.call_soon(self._protocol.connection_made, self)
510 # only start reading when connection_made() has been called
511 self._loop.call_soon(self._add_reader,
512 self._fileno, self._read_ready)
513 if waiter is not None:
514 # only wake up the waiter when connection_made() has been called
515 self._loop.call_soon(futures._set_result_unless_cancelled,
516 waiter, None)
518 def _add_reader(self, fd, callback):
519 if not self.is_reading():
520 return
521 self._loop._add_reader(fd, callback)
523 def is_reading(self):
524 return not self._paused and not self._closing
526 def __repr__(self):
527 info = [self.__class__.__name__]
528 if self._pipe is None: 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true
529 info.append('closed')
530 elif self._closing: 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true
531 info.append('closing')
532 info.append(f'fd={self._fileno}')
533 selector = getattr(self._loop, '_selector', None)
534 if self._pipe is not None and selector is not None:
535 polling = selector_events._test_selector_event(
536 selector, self._fileno, selectors.EVENT_READ)
537 if polling: 537 ↛ 538line 537 didn't jump to line 538 because the condition on line 537 was never true
538 info.append('polling')
539 else:
540 info.append('idle')
541 elif self._pipe is not None: 541 ↛ 544line 541 didn't jump to line 544 because the condition on line 541 was always true
542 info.append('open')
543 else:
544 info.append('closed')
545 return '<{}>'.format(' '.join(info))
547 def _read_ready(self):
548 try:
549 data = os.read(self._fileno, self.max_size)
550 except (BlockingIOError, InterruptedError):
551 pass
552 except OSError as exc:
553 self._fatal_error(exc, 'Fatal read error on pipe transport')
554 else:
555 if data:
556 self._protocol.data_received(data)
557 else:
558 if self._loop.get_debug(): 558 ↛ 559line 558 didn't jump to line 559 because the condition on line 558 was never true
559 logger.info("%r was closed by peer", self)
560 self._closing = True
561 self._loop._remove_reader(self._fileno)
562 self._loop.call_soon(self._protocol.eof_received)
563 self._loop.call_soon(self._call_connection_lost, None)
565 def pause_reading(self):
566 if not self.is_reading():
567 return
568 self._paused = True
569 self._loop._remove_reader(self._fileno)
570 if self._loop.get_debug(): 570 ↛ 571line 570 didn't jump to line 571 because the condition on line 570 was never true
571 logger.debug("%r pauses reading", self)
573 def resume_reading(self):
574 if self._closing or not self._paused:
575 return
576 self._paused = False
577 self._loop._add_reader(self._fileno, self._read_ready)
578 if self._loop.get_debug(): 578 ↛ 579line 578 didn't jump to line 579 because the condition on line 578 was never true
579 logger.debug("%r resumes reading", self)
581 def set_protocol(self, protocol):
582 self._protocol = protocol
584 def get_protocol(self):
585 return self._protocol
587 def is_closing(self):
588 return self._closing
590 def close(self):
591 if not self._closing:
592 self._close(None)
594 def __del__(self, _warn=warnings.warn):
595 if self._pipe is not None: 595 ↛ 596line 595 didn't jump to line 596 because the condition on line 595 was never true
596 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
597 self._pipe.close()
599 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
600 # should be called by exception handler only
601 if (isinstance(exc, OSError) and exc.errno == errno.EIO): 601 ↛ 602line 601 didn't jump to line 602 because the condition on line 601 was never true
602 if self._loop.get_debug():
603 logger.debug("%r: %s", self, message, exc_info=True)
604 else:
605 self._loop.call_exception_handler({
606 'message': message,
607 'exception': exc,
608 'transport': self,
609 'protocol': self._protocol,
610 })
611 self._close(exc)
613 def _close(self, exc):
614 self._closing = True
615 self._loop._remove_reader(self._fileno)
616 self._loop.call_soon(self._call_connection_lost, exc)
618 def _call_connection_lost(self, exc):
619 try:
620 self._protocol.connection_lost(exc)
621 finally:
622 self._pipe.close()
623 self._pipe = None
624 self._protocol = None
625 self._loop = None
628class _UnixWritePipeTransport(transports._FlowControlMixin,
629 transports.WriteTransport):
631 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
632 super().__init__(extra, loop)
633 self._extra['pipe'] = pipe
634 self._pipe = pipe
635 self._fileno = pipe.fileno()
636 self._protocol = protocol
637 self._buffer = bytearray()
638 self._conn_lost = 0
639 self._closing = False # Set when close() or write_eof() called.
641 mode = os.fstat(self._fileno).st_mode
642 is_char = stat.S_ISCHR(mode)
643 is_fifo = stat.S_ISFIFO(mode)
644 is_socket = stat.S_ISSOCK(mode)
645 if not (is_char or is_fifo or is_socket): 645 ↛ 646line 645 didn't jump to line 646 because the condition on line 645 was never true
646 self._pipe = None
647 self._fileno = None
648 self._protocol = None
649 raise ValueError("Pipe transport is only for "
650 "pipes, sockets and character devices")
652 os.set_blocking(self._fileno, False)
653 self._loop.call_soon(self._protocol.connection_made, self)
655 # On AIX, the reader trick (to be notified when the read end of the
656 # socket is closed) only works for sockets. On other platforms it
657 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
658 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
659 # only start reading when connection_made() has been called
660 self._loop.call_soon(self._loop._add_reader,
661 self._fileno, self._read_ready)
663 if waiter is not None:
664 # only wake up the waiter when connection_made() has been called
665 self._loop.call_soon(futures._set_result_unless_cancelled,
666 waiter, None)
668 def __repr__(self):
669 info = [self.__class__.__name__]
670 if self._pipe is None: 670 ↛ 671line 670 didn't jump to line 671 because the condition on line 670 was never true
671 info.append('closed')
672 elif self._closing: 672 ↛ 673line 672 didn't jump to line 673 because the condition on line 672 was never true
673 info.append('closing')
674 info.append(f'fd={self._fileno}')
675 selector = getattr(self._loop, '_selector', None)
676 if self._pipe is not None and selector is not None: 676 ↛ 677line 676 didn't jump to line 677 because the condition on line 676 was never true
677 polling = selector_events._test_selector_event(
678 selector, self._fileno, selectors.EVENT_WRITE)
679 if polling:
680 info.append('polling')
681 else:
682 info.append('idle')
684 bufsize = self.get_write_buffer_size()
685 info.append(f'bufsize={bufsize}')
686 elif self._pipe is not None: 686 ↛ 689line 686 didn't jump to line 689 because the condition on line 686 was always true
687 info.append('open')
688 else:
689 info.append('closed')
690 return '<{}>'.format(' '.join(info))
692 def get_write_buffer_size(self):
693 return len(self._buffer)
695 def _read_ready(self):
696 # Pipe was closed by peer.
697 if self._loop.get_debug(): 697 ↛ 698line 697 didn't jump to line 698 because the condition on line 697 was never true
698 logger.info("%r was closed by peer", self)
699 if self._buffer:
700 self._close(BrokenPipeError())
701 else:
702 self._close()
704 def write(self, data):
705 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
706 if isinstance(data, bytearray): 706 ↛ 707line 706 didn't jump to line 707 because the condition on line 706 was never true
707 data = memoryview(data)
708 if not data:
709 return
711 if self._conn_lost or self._closing:
712 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
713 logger.warning('pipe closed by peer or '
714 'os.write(pipe, data) raised exception.')
715 self._conn_lost += 1
716 return
718 if not self._buffer:
719 # Attempt to send it right away first.
720 try:
721 n = os.write(self._fileno, data)
722 except (BlockingIOError, InterruptedError):
723 n = 0
724 except (SystemExit, KeyboardInterrupt):
725 raise
726 except BaseException as exc:
727 self._conn_lost += 1
728 self._fatal_error(exc, 'Fatal write error on pipe transport')
729 return
730 if n == len(data):
731 return
732 elif n > 0:
733 data = memoryview(data)[n:]
734 self._loop._add_writer(self._fileno, self._write_ready)
736 self._buffer += data
737 self._maybe_pause_protocol()
739 def _write_ready(self):
740 assert self._buffer, 'Data should not be empty'
742 try:
743 n = os.write(self._fileno, self._buffer)
744 except (BlockingIOError, InterruptedError):
745 pass
746 except (SystemExit, KeyboardInterrupt):
747 raise
748 except BaseException as exc:
749 self._buffer.clear()
750 self._conn_lost += 1
751 # Remove writer here, _fatal_error() doesn't it
752 # because _buffer is empty.
753 self._loop._remove_writer(self._fileno)
754 self._fatal_error(exc, 'Fatal write error on pipe transport')
755 else:
756 if n == len(self._buffer):
757 self._buffer.clear()
758 self._loop._remove_writer(self._fileno)
759 self._maybe_resume_protocol() # May append to buffer.
760 if self._closing:
761 self._loop._remove_reader(self._fileno)
762 self._call_connection_lost(None)
763 return
764 elif n > 0:
765 del self._buffer[:n]
767 def can_write_eof(self):
768 return True
770 def write_eof(self):
771 if self._closing: 771 ↛ 772line 771 didn't jump to line 772 because the condition on line 771 was never true
772 return
773 assert self._pipe
774 self._closing = True
775 if not self._buffer:
776 self._loop._remove_reader(self._fileno)
777 self._loop.call_soon(self._call_connection_lost, None)
779 def set_protocol(self, protocol):
780 self._protocol = protocol
782 def get_protocol(self):
783 return self._protocol
785 def is_closing(self):
786 return self._closing
788 def close(self):
789 if self._pipe is not None and not self._closing:
790 # write_eof is all what we needed to close the write pipe
791 self.write_eof()
793 def __del__(self, _warn=warnings.warn):
794 if self._pipe is not None: 794 ↛ 795line 794 didn't jump to line 795 because the condition on line 794 was never true
795 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
796 self._pipe.close()
798 def abort(self):
799 self._close(None)
801 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
802 # should be called by exception handler only
803 if isinstance(exc, OSError): 803 ↛ 807line 803 didn't jump to line 807 because the condition on line 803 was always true
804 if self._loop.get_debug(): 804 ↛ 805line 804 didn't jump to line 805 because the condition on line 804 was never true
805 logger.debug("%r: %s", self, message, exc_info=True)
806 else:
807 self._loop.call_exception_handler({
808 'message': message,
809 'exception': exc,
810 'transport': self,
811 'protocol': self._protocol,
812 })
813 self._close(exc)
815 def _close(self, exc=None):
816 self._closing = True
817 if self._buffer:
818 self._loop._remove_writer(self._fileno)
819 self._buffer.clear()
820 self._loop._remove_reader(self._fileno)
821 self._loop.call_soon(self._call_connection_lost, exc)
823 def _call_connection_lost(self, exc):
824 try:
825 self._protocol.connection_lost(exc)
826 finally:
827 self._pipe.close()
828 self._pipe = None
829 self._protocol = None
830 self._loop = None
833class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
835 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
836 stdin_w = None
837 if stdin == subprocess.PIPE and sys.platform.startswith('aix'): 837 ↛ 842line 837 didn't jump to line 842 because the condition on line 837 was never true
838 # Use a socket pair for stdin on AIX, since it does not
839 # support selecting read events on the write end of a
840 # socket (which we use in order to detect closing of the
841 # other end).
842 stdin, stdin_w = socket.socketpair()
843 try:
844 self._proc = subprocess.Popen(
845 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
846 universal_newlines=False, bufsize=bufsize, **kwargs)
847 if stdin_w is not None: 847 ↛ 848line 847 didn't jump to line 848 because the condition on line 847 was never true
848 stdin.close()
849 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
850 stdin_w = None
851 finally:
852 if stdin_w is not None: 852 ↛ 853line 852 didn't jump to line 853 because the condition on line 852 was never true
853 stdin.close()
854 stdin_w.close()
857class _PidfdChildWatcher:
858 """Child watcher implementation using Linux's pid file descriptors.
860 This child watcher polls process file descriptors (pidfds) to await child
861 process termination. In some respects, PidfdChildWatcher is a "Goldilocks"
862 child watcher implementation. It doesn't require signals or threads, doesn't
863 interfere with any processes launched outside the event loop, and scales
864 linearly with the number of subprocesses launched by the event loop. The
865 main disadvantage is that pidfds are specific to Linux, and only work on
866 recent (5.3+) kernels.
867 """
869 def add_child_handler(self, pid, callback, *args):
870 loop = events.get_running_loop()
871 pidfd = os.pidfd_open(pid)
872 loop._add_reader(pidfd, self._do_wait, pid, pidfd, callback, args)
874 def _do_wait(self, pid, pidfd, callback, args):
875 loop = events.get_running_loop()
876 loop._remove_reader(pidfd)
877 try:
878 _, status = os.waitpid(pid, 0)
879 except ChildProcessError:
880 # The child process is already reaped
881 # (may happen if waitpid() is called elsewhere).
882 returncode = 255
883 logger.warning(
884 "child process pid %d exit status already read: "
885 " will report returncode 255",
886 pid)
887 else:
888 returncode = waitstatus_to_exitcode(status)
890 os.close(pidfd)
891 callback(pid, returncode, *args)
893class _ThreadedChildWatcher:
894 """Threaded child watcher implementation.
896 The watcher uses a thread per process
897 for waiting for the process finish.
899 It doesn't require subscription on POSIX signal
900 but a thread creation is not free.
902 The watcher has O(1) complexity, its performance doesn't depend
903 on amount of spawn processes.
904 """
906 def __init__(self):
907 self._pid_counter = itertools.count(0)
908 self._threads = {}
910 def __del__(self, _warn=warnings.warn):
911 threads = [thread for thread in list(self._threads.values())
912 if thread.is_alive()]
913 if threads: 913 ↛ 914line 913 didn't jump to line 914 because the condition on line 913 was never true
914 _warn(f"{self.__class__} has registered but not finished child processes",
915 ResourceWarning,
916 source=self)
918 def add_child_handler(self, pid, callback, *args):
919 loop = events.get_running_loop()
920 thread = threading.Thread(target=self._do_waitpid,
921 name=f"asyncio-waitpid-{next(self._pid_counter)}",
922 args=(loop, pid, callback, args),
923 daemon=True)
924 self._threads[pid] = thread
925 thread.start()
927 def _do_waitpid(self, loop, expected_pid, callback, args):
928 assert expected_pid > 0
930 try:
931 pid, status = os.waitpid(expected_pid, 0)
932 except ChildProcessError:
933 # The child process is already reaped
934 # (may happen if waitpid() is called elsewhere).
935 pid = expected_pid
936 returncode = 255
937 logger.warning(
938 "Unknown child process pid %d, will report returncode 255",
939 pid)
940 else:
941 returncode = waitstatus_to_exitcode(status)
942 if loop.get_debug(): 942 ↛ 943line 942 didn't jump to line 943 because the condition on line 942 was never true
943 logger.debug('process %s exited with returncode %s',
944 expected_pid, returncode)
946 if loop.is_closed(): 946 ↛ 947line 946 didn't jump to line 947 because the condition on line 946 was never true
947 logger.warning("Loop %r that handles pid %r is closed", loop, pid)
948 else:
949 loop.call_soon_threadsafe(callback, pid, returncode, *args)
951 self._threads.pop(expected_pid)
953def can_use_pidfd():
954 if not hasattr(os, 'pidfd_open'): 954 ↛ 955line 954 didn't jump to line 955 because the condition on line 954 was never true
955 return False
956 try:
957 pid = os.getpid()
958 os.close(os.pidfd_open(pid, 0))
959 except OSError:
960 # blocked by security policy like SECCOMP
961 return False
962 return True
965class _UnixDefaultEventLoopPolicy(events._BaseDefaultEventLoopPolicy):
966 """UNIX event loop policy"""
967 _loop_factory = _UnixSelectorEventLoop
970SelectorEventLoop = _UnixSelectorEventLoop
971_DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
972EventLoop = SelectorEventLoop