Coverage for Lib/asyncio/unix_events.py: 87%

619 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-24 03:28 +0000

1"""Selector event loop for Unix with signal handling.""" 

2 

3import errno 

4import io 

5import itertools 

6import os 

7import selectors 

8import signal 

9import socket 

10import stat 

11import subprocess 

12import sys 

13import threading 

14import warnings 

15import inspect 

16 

17from . import base_events 

18from . import base_subprocess 

19from . import constants 

20from . import coroutines 

21from . import events 

22from . import exceptions 

23from . import futures 

24from . import selector_events 

25from . import tasks 

26from . import transports 

27from .log import logger 

28 

29 

30__all__ = ( 

31 'SelectorEventLoop', 

32 'EventLoop', 

33) 

34 

35 

36if sys.platform == 'win32': # pragma: no cover 

37 raise ImportError('Signals are not really supported on Windows') 

38 

39 

40def _sighandler_noop(signum, frame): 

41 """Dummy signal handler.""" 

42 pass 

43 

44 

45def waitstatus_to_exitcode(status): 

46 try: 

47 return os.waitstatus_to_exitcode(status) 

48 except ValueError: 

49 # The child exited, but we don't understand its status. 

50 # This shouldn't happen, but if it does, let's just 

51 # return that status; perhaps that helps debug it. 

52 return status 

53 

54 

55class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): 

56 """Unix event loop. 

57 

58 Adds signal handling and UNIX Domain Socket support to 

59 SelectorEventLoop. 

60 """ 

61 

62 def __init__(self, selector=None): 

63 super().__init__(selector) 

64 self._signal_handlers = {} 

65 self._unix_server_sockets = {} 

66 if can_use_pidfd(): 

67 self._watcher = _PidfdChildWatcher() 

68 else: 

69 self._watcher = _ThreadedChildWatcher() 

70 

71 def close(self): 

72 super().close() 

73 if not sys.is_finalizing(): 

74 for sig in list(self._signal_handlers): 

75 self.remove_signal_handler(sig) 

76 else: 

77 if self._signal_handlers: 77 ↛ exitline 77 didn't return from function 'close' because the condition on line 77 was always true

78 warnings.warn(f"Closing the loop {self!r} " 

79 f"on interpreter shutdown " 

80 f"stage, skipping signal handlers removal", 

81 ResourceWarning, 

82 source=self) 

83 self._signal_handlers.clear() 

84 

85 def _process_self_data(self, data): 

86 for signum in data: 

87 if not signum: 

88 # ignore null bytes written by _write_to_self() 

89 continue 

90 self._handle_signal(signum) 

91 

92 def add_signal_handler(self, sig, callback, *args): 

93 """Add a handler for a signal. UNIX only. 

94 

95 Raise ValueError if the signal number is invalid or uncatchable. 

96 Raise RuntimeError if there is a problem setting up the handler. 

97 """ 

98 if (coroutines.iscoroutine(callback) or 

99 inspect.iscoroutinefunction(callback)): 

100 raise TypeError("coroutines cannot be used " 

101 "with add_signal_handler()") 

102 self._check_signal(sig) 

103 self._check_closed() 

104 try: 

105 # set_wakeup_fd() raises ValueError if this is not the 

106 # main thread. By calling it early we ensure that an 

107 # event loop running in another thread cannot add a signal 

108 # handler. 

109 signal.set_wakeup_fd(self._csock.fileno()) 

110 except (ValueError, OSError) as exc: 

111 raise RuntimeError(str(exc)) 

112 

113 handle = events.Handle(callback, args, self, None) 

114 self._signal_handlers[sig] = handle 

115 

116 try: 

117 # Register a dummy signal handler to ask Python to write the signal 

118 # number in the wakeup file descriptor. _process_self_data() will 

119 # read signal numbers from this file descriptor to handle signals. 

120 signal.signal(sig, _sighandler_noop) 

121 

122 # Set SA_RESTART to limit EINTR occurrences. 

123 signal.siginterrupt(sig, False) 

124 except OSError as exc: 

125 del self._signal_handlers[sig] 

126 if not self._signal_handlers: 

127 try: 

128 signal.set_wakeup_fd(-1) 

129 except (ValueError, OSError) as nexc: 

130 logger.info('set_wakeup_fd(-1) failed: %s', nexc) 

131 

132 if exc.errno == errno.EINVAL: 

133 raise RuntimeError(f'sig {sig} cannot be caught') 

134 else: 

135 raise 

136 

137 def _handle_signal(self, sig): 

138 """Internal helper that is the actual signal handler.""" 

139 handle = self._signal_handlers.get(sig) 

140 if handle is None: 

141 return # Assume it's some race condition. 

142 if handle._cancelled: 

143 self.remove_signal_handler(sig) # Remove it properly. 

144 else: 

145 self._add_callback_signalsafe(handle) 

146 

147 def remove_signal_handler(self, sig): 

148 """Remove a handler for a signal. UNIX only. 

149 

150 Return True if a signal handler was removed, False if not. 

151 """ 

152 self._check_signal(sig) 

153 try: 

154 del self._signal_handlers[sig] 

155 except KeyError: 

156 return False 

157 

158 if sig == signal.SIGINT: 

159 handler = signal.default_int_handler 

160 else: 

161 handler = signal.SIG_DFL 

162 

163 try: 

164 signal.signal(sig, handler) 

165 except OSError as exc: 

166 if exc.errno == errno.EINVAL: 

167 raise RuntimeError(f'sig {sig} cannot be caught') 

168 else: 

169 raise 

170 

171 if not self._signal_handlers: 

172 try: 

173 signal.set_wakeup_fd(-1) 

174 except (ValueError, OSError) as exc: 

175 logger.info('set_wakeup_fd(-1) failed: %s', exc) 

176 

177 return True 

178 

179 def _check_signal(self, sig): 

180 """Internal helper to validate a signal. 

181 

182 Raise ValueError if the signal number is invalid or uncatchable. 

183 Raise RuntimeError if there is a problem setting up the handler. 

184 """ 

185 if not isinstance(sig, int): 

186 raise TypeError(f'sig must be an int, not {sig!r}') 

187 

188 if sig not in signal.valid_signals(): 

189 raise ValueError(f'invalid signal number {sig}') 

190 

191 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 

192 extra=None): 

193 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra) 

194 

195 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 

196 extra=None): 

197 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra) 

198 

199 async def _make_subprocess_transport(self, protocol, args, shell, 

200 stdin, stdout, stderr, bufsize, 

201 extra=None, **kwargs): 

202 watcher = self._watcher 

203 waiter = self.create_future() 

204 transp = _UnixSubprocessTransport(self, protocol, args, shell, 

205 stdin, stdout, stderr, bufsize, 

206 waiter=waiter, extra=extra, 

207 **kwargs) 

208 watcher.add_child_handler(transp.get_pid(), 

209 self._child_watcher_callback, transp) 

210 try: 

211 await waiter 

212 except (SystemExit, KeyboardInterrupt): 

213 raise 

214 except BaseException: 

215 transp.close() 

216 await transp._wait() 

217 raise 

218 

219 return transp 

220 

221 def _child_watcher_callback(self, pid, returncode, transp): 

222 self.call_soon_threadsafe(transp._process_exited, returncode) 

223 

224 async def create_unix_connection( 

225 self, protocol_factory, path=None, *, 

226 ssl=None, sock=None, 

227 server_hostname=None, 

228 ssl_handshake_timeout=None, 

229 ssl_shutdown_timeout=None): 

230 assert server_hostname is None or isinstance(server_hostname, str) 

231 if ssl: 

232 if server_hostname is None: 

233 raise ValueError( 

234 'you have to pass server_hostname when using ssl') 

235 else: 

236 if server_hostname is not None: 

237 raise ValueError('server_hostname is only meaningful with ssl') 

238 if ssl_handshake_timeout is not None: 

239 raise ValueError( 

240 'ssl_handshake_timeout is only meaningful with ssl') 

241 if ssl_shutdown_timeout is not None: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true

242 raise ValueError( 

243 'ssl_shutdown_timeout is only meaningful with ssl') 

244 

245 if path is not None: 

246 if sock is not None: 

247 raise ValueError( 

248 'path and sock can not be specified at the same time') 

249 

250 path = os.fspath(path) 

251 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) 

252 try: 

253 sock.setblocking(False) 

254 await self.sock_connect(sock, path) 

255 except: 

256 sock.close() 

257 raise 

258 

259 else: 

260 if sock is None: 

261 raise ValueError('no path and sock were specified') 

262 if (sock.family != socket.AF_UNIX or 262 ↛ 266line 262 didn't jump to line 266 because the condition on line 262 was always true

263 sock.type != socket.SOCK_STREAM): 

264 raise ValueError( 

265 f'A UNIX Domain Stream Socket was expected, got {sock!r}') 

266 sock.setblocking(False) 

267 

268 transport, protocol = await self._create_connection_transport( 

269 sock, protocol_factory, ssl, server_hostname, 

270 ssl_handshake_timeout=ssl_handshake_timeout, 

271 ssl_shutdown_timeout=ssl_shutdown_timeout) 

272 return transport, protocol 

273 

274 async def create_unix_server( 

275 self, protocol_factory, path=None, *, 

276 sock=None, backlog=100, ssl=None, 

277 ssl_handshake_timeout=None, 

278 ssl_shutdown_timeout=None, 

279 start_serving=True, cleanup_socket=True): 

280 if isinstance(ssl, bool): 

281 raise TypeError('ssl argument must be an SSLContext or None') 

282 

283 if ssl_handshake_timeout is not None and not ssl: 

284 raise ValueError( 

285 'ssl_handshake_timeout is only meaningful with ssl') 

286 

287 if ssl_shutdown_timeout is not None and not ssl: 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true

288 raise ValueError( 

289 'ssl_shutdown_timeout is only meaningful with ssl') 

290 

291 if path is not None: 

292 if sock is not None: 

293 raise ValueError( 

294 'path and sock can not be specified at the same time') 

295 

296 path = os.fspath(path) 

297 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 

298 

299 # Check for abstract socket. `str` and `bytes` paths are supported. 

300 if path[0] not in (0, '\x00'): 300 ↛ 311line 300 didn't jump to line 311 because the condition on line 300 was always true

301 try: 

302 if stat.S_ISSOCK(os.stat(path).st_mode): 

303 os.remove(path) 

304 except FileNotFoundError: 

305 pass 

306 except OSError as err: 

307 # Directory may have permissions only to create socket. 

308 logger.error('Unable to check or remove stale UNIX socket ' 

309 '%r: %r', path, err) 

310 

311 try: 

312 sock.bind(path) 

313 except OSError as exc: 

314 sock.close() 

315 if exc.errno == errno.EADDRINUSE: 

316 # Let's improve the error message by adding 

317 # with what exact address it occurs. 

318 msg = f'Address {path!r} is already in use' 

319 raise OSError(errno.EADDRINUSE, msg) from None 

320 else: 

321 raise 

322 except: 

323 sock.close() 

324 raise 

325 else: 

326 if sock is None: 

327 raise ValueError( 

328 'path was not specified, and no sock specified') 

329 

330 if (sock.family != socket.AF_UNIX or 

331 sock.type != socket.SOCK_STREAM): 

332 raise ValueError( 

333 f'A UNIX Domain Stream Socket was expected, got {sock!r}') 

334 

335 if cleanup_socket: 

336 path = sock.getsockname() 

337 # Check for abstract socket. `str` and `bytes` paths are supported. 

338 if path[0] not in (0, '\x00'): 338 ↛ 344line 338 didn't jump to line 344 because the condition on line 338 was always true

339 try: 

340 self._unix_server_sockets[sock] = os.stat(path).st_ino 

341 except FileNotFoundError: 

342 pass 

343 

344 sock.setblocking(False) 

345 server = base_events.Server(self, [sock], protocol_factory, 

346 ssl, backlog, ssl_handshake_timeout, 

347 ssl_shutdown_timeout) 

348 if start_serving: 

349 server._start_serving() 

350 # Skip one loop iteration so that all 'loop.add_reader' 

351 # go through. 

352 await tasks.sleep(0) 

353 

354 return server 

355 

356 async def _sock_sendfile_native(self, sock, file, offset, count): 

357 try: 

358 os.sendfile 

359 except AttributeError: 

360 raise exceptions.SendfileNotAvailableError( 

361 "os.sendfile() is not available") 

362 try: 

363 fileno = file.fileno() 

364 except (AttributeError, io.UnsupportedOperation): 

365 raise exceptions.SendfileNotAvailableError("not a regular file") 

366 try: 

367 fsize = os.fstat(fileno).st_size 

368 except OSError: 

369 raise exceptions.SendfileNotAvailableError("not a regular file") 

370 blocksize = count if count else fsize 

371 if not blocksize: 

372 return 0 # empty file 

373 

374 fut = self.create_future() 

375 self._sock_sendfile_native_impl(fut, None, sock, fileno, 

376 offset, count, blocksize, 0) 

377 return await fut 

378 

379 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno, 

380 offset, count, blocksize, total_sent): 

381 fd = sock.fileno() 

382 if registered_fd is not None: 

383 # Remove the callback early. It should be rare that the 

384 # selector says the fd is ready but the call still returns 

385 # EAGAIN, and I am willing to take a hit in that case in 

386 # order to simplify the common case. 

387 self.remove_writer(registered_fd) 

388 if fut.cancelled(): 

389 self._sock_sendfile_update_filepos(fileno, offset) 

390 return 

391 if count: 

392 blocksize = count - total_sent 

393 if blocksize <= 0: 

394 self._sock_sendfile_update_filepos(fileno, offset) 

395 fut.set_result(total_sent) 

396 return 

397 

398 # On 32-bit architectures truncate to 1GiB to avoid OverflowError 

399 blocksize = min(blocksize, sys.maxsize//2 + 1) 

400 

401 try: 

402 sent = os.sendfile(fd, fileno, offset, blocksize) 

403 except (BlockingIOError, InterruptedError): 

404 if registered_fd is None: 404 ↛ 406line 404 didn't jump to line 406 because the condition on line 404 was always true

405 self._sock_add_cancellation_callback(fut, sock) 

406 self.add_writer(fd, self._sock_sendfile_native_impl, fut, 

407 fd, sock, fileno, 

408 offset, count, blocksize, total_sent) 

409 except OSError as exc: 

410 if (registered_fd is not None and 410 ↛ 417line 410 didn't jump to line 417 because the condition on line 410 was never true

411 exc.errno == errno.ENOTCONN and 

412 type(exc) is not ConnectionError): 

413 # If we have an ENOTCONN and this isn't a first call to 

414 # sendfile(), i.e. the connection was closed in the middle 

415 # of the operation, normalize the error to ConnectionError 

416 # to make it consistent across all Posix systems. 

417 new_exc = ConnectionError( 

418 "socket is not connected", errno.ENOTCONN) 

419 new_exc.__cause__ = exc 

420 exc = new_exc 

421 if total_sent == 0: 

422 # We can get here for different reasons, the main 

423 # one being 'file' is not a regular mmap(2)-like 

424 # file, in which case we'll fall back on using 

425 # plain send(). 

426 err = exceptions.SendfileNotAvailableError( 

427 "os.sendfile call failed") 

428 self._sock_sendfile_update_filepos(fileno, offset) 

429 fut.set_exception(err) 

430 else: 

431 self._sock_sendfile_update_filepos(fileno, offset) 

432 fut.set_exception(exc) 

433 except (SystemExit, KeyboardInterrupt): 

434 raise 

435 except BaseException as exc: 

436 self._sock_sendfile_update_filepos(fileno, offset) 

437 fut.set_exception(exc) 

438 else: 

439 if sent == 0: 

440 # EOF 

441 self._sock_sendfile_update_filepos(fileno, offset) 

442 fut.set_result(total_sent) 

443 else: 

444 offset += sent 

445 total_sent += sent 

446 if registered_fd is None: 

447 self._sock_add_cancellation_callback(fut, sock) 

448 self.add_writer(fd, self._sock_sendfile_native_impl, fut, 

449 fd, sock, fileno, 

450 offset, count, blocksize, total_sent) 

451 

452 def _sock_sendfile_update_filepos(self, fileno, offset): 

453 # After this helper runs, the source fd's lseek pointer is at offset." 

454 os.lseek(fileno, offset, os.SEEK_SET) 

455 

456 def _sock_add_cancellation_callback(self, fut, sock): 

457 def cb(fut): 

458 if fut.cancelled(): 

459 fd = sock.fileno() 

460 if fd != -1: 460 ↛ exitline 460 didn't return from function 'cb' because the condition on line 460 was always true

461 self.remove_writer(fd) 

462 fut.add_done_callback(cb) 

463 

464 def _stop_serving(self, sock): 

465 # Is this a unix socket that needs cleanup? 

466 if sock in self._unix_server_sockets: 

467 path = sock.getsockname() 

468 else: 

469 path = None 

470 

471 super()._stop_serving(sock) 

472 

473 if path is not None: 

474 prev_ino = self._unix_server_sockets[sock] 

475 del self._unix_server_sockets[sock] 

476 try: 

477 if os.stat(path).st_ino == prev_ino: 

478 os.unlink(path) 

479 except FileNotFoundError: 

480 pass 

481 except OSError as err: 

482 logger.error('Unable to clean up listening UNIX socket ' 

483 '%r: %r', path, err) 

484 

485 

486class _UnixReadPipeTransport(transports.ReadTransport): 

487 

488 max_size = 256 * 1024 # max bytes we read in one event loop iteration 

489 

490 def __init__(self, loop, pipe, protocol, waiter=None, extra=None): 

491 super().__init__(extra) 

492 self._extra['pipe'] = pipe 

493 self._loop = loop 

494 self._pipe = pipe 

495 self._fileno = pipe.fileno() 

496 self._protocol = protocol 

497 self._closing = False 

498 self._paused = False 

499 

500 mode = os.fstat(self._fileno).st_mode 

501 if not (stat.S_ISFIFO(mode) or 501 ↛ 504line 501 didn't jump to line 504 because the condition on line 501 was never true

502 stat.S_ISSOCK(mode) or 

503 stat.S_ISCHR(mode)): 

504 self._pipe = None 

505 self._fileno = None 

506 self._protocol = None 

507 raise ValueError("Pipe transport is for pipes/sockets only.") 

508 

509 os.set_blocking(self._fileno, False) 

510 

511 self._loop.call_soon(self._protocol.connection_made, self) 

512 # only start reading when connection_made() has been called 

513 self._loop.call_soon(self._add_reader, 

514 self._fileno, self._read_ready) 

515 if waiter is not None: 

516 # only wake up the waiter when connection_made() has been called 

517 self._loop.call_soon(futures._set_result_unless_cancelled, 

518 waiter, None) 

519 

520 def _add_reader(self, fd, callback): 

521 if not self.is_reading(): 

522 return 

523 self._loop._add_reader(fd, callback) 

524 

525 def is_reading(self): 

526 return not self._paused and not self._closing 

527 

528 def __repr__(self): 

529 info = [self.__class__.__name__] 

530 if self._pipe is None: 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true

531 info.append('closed') 

532 elif self._closing: 532 ↛ 533line 532 didn't jump to line 533 because the condition on line 532 was never true

533 info.append('closing') 

534 info.append(f'fd={self._fileno}') 

535 selector = getattr(self._loop, '_selector', None) 

536 if self._pipe is not None and selector is not None: 

537 polling = selector_events._test_selector_event( 

538 selector, self._fileno, selectors.EVENT_READ) 

539 if polling: 539 ↛ 540line 539 didn't jump to line 540 because the condition on line 539 was never true

540 info.append('polling') 

541 else: 

542 info.append('idle') 

543 elif self._pipe is not None: 543 ↛ 546line 543 didn't jump to line 546 because the condition on line 543 was always true

544 info.append('open') 

545 else: 

546 info.append('closed') 

547 return '<{}>'.format(' '.join(info)) 

548 

549 def _read_ready(self): 

550 try: 

551 data = os.read(self._fileno, self.max_size) 

552 except (BlockingIOError, InterruptedError): 

553 pass 

554 except OSError as exc: 

555 self._fatal_error(exc, 'Fatal read error on pipe transport') 

556 else: 

557 if data: 

558 self._protocol.data_received(data) 

559 else: 

560 if self._loop.get_debug(): 560 ↛ 561line 560 didn't jump to line 561 because the condition on line 560 was never true

561 logger.info("%r was closed by peer", self) 

562 self._closing = True 

563 self._loop._remove_reader(self._fileno) 

564 self._loop.call_soon(self._protocol.eof_received) 

565 self._loop.call_soon(self._call_connection_lost, None) 

566 

567 def pause_reading(self): 

568 if not self.is_reading(): 

569 return 

570 self._paused = True 

571 self._loop._remove_reader(self._fileno) 

572 if self._loop.get_debug(): 572 ↛ 573line 572 didn't jump to line 573 because the condition on line 572 was never true

573 logger.debug("%r pauses reading", self) 

574 

575 def resume_reading(self): 

576 if self._closing or not self._paused: 

577 return 

578 self._paused = False 

579 self._loop._add_reader(self._fileno, self._read_ready) 

580 if self._loop.get_debug(): 580 ↛ 581line 580 didn't jump to line 581 because the condition on line 580 was never true

581 logger.debug("%r resumes reading", self) 

582 

583 def set_protocol(self, protocol): 

584 self._protocol = protocol 

585 

586 def get_protocol(self): 

587 return self._protocol 

588 

589 def is_closing(self): 

590 return self._closing 

591 

592 def close(self): 

593 if not self._closing: 

594 self._close(None) 

595 

596 def __del__(self, _warn=warnings.warn): 

597 if self._pipe is not None: 597 ↛ 598line 597 didn't jump to line 598 because the condition on line 597 was never true

598 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 

599 self._pipe.close() 

600 

601 def _fatal_error(self, exc, message='Fatal error on pipe transport'): 

602 # should be called by exception handler only 

603 if (isinstance(exc, OSError) and exc.errno == errno.EIO): 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true

604 if self._loop.get_debug(): 

605 logger.debug("%r: %s", self, message, exc_info=True) 

606 else: 

607 self._loop.call_exception_handler({ 

608 'message': message, 

609 'exception': exc, 

610 'transport': self, 

611 'protocol': self._protocol, 

612 }) 

613 self._close(exc) 

614 

615 def _close(self, exc): 

616 self._closing = True 

617 self._loop._remove_reader(self._fileno) 

618 self._loop.call_soon(self._call_connection_lost, exc) 

619 

620 def _call_connection_lost(self, exc): 

621 try: 

622 self._protocol.connection_lost(exc) 

623 finally: 

624 self._pipe.close() 

625 self._pipe = None 

626 self._protocol = None 

627 self._loop = None 

628 

629 

630class _UnixWritePipeTransport(transports._FlowControlMixin, 

631 transports.WriteTransport): 

632 

633 def __init__(self, loop, pipe, protocol, waiter=None, extra=None): 

634 super().__init__(extra, loop) 

635 self._extra['pipe'] = pipe 

636 self._pipe = pipe 

637 self._fileno = pipe.fileno() 

638 self._protocol = protocol 

639 self._buffer = bytearray() 

640 self._conn_lost = 0 

641 self._closing = False # Set when close() or write_eof() called. 

642 

643 mode = os.fstat(self._fileno).st_mode 

644 is_char = stat.S_ISCHR(mode) 

645 is_fifo = stat.S_ISFIFO(mode) 

646 is_socket = stat.S_ISSOCK(mode) 

647 if not (is_char or is_fifo or is_socket): 647 ↛ 648line 647 didn't jump to line 648 because the condition on line 647 was never true

648 self._pipe = None 

649 self._fileno = None 

650 self._protocol = None 

651 raise ValueError("Pipe transport is only for " 

652 "pipes, sockets and character devices") 

653 

654 os.set_blocking(self._fileno, False) 

655 self._loop.call_soon(self._protocol.connection_made, self) 

656 

657 # On AIX, the reader trick (to be notified when the read end of the 

658 # socket is closed) only works for sockets. On other platforms it 

659 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) 

660 if is_socket or (is_fifo and not sys.platform.startswith("aix")): 

661 # only start reading when connection_made() has been called 

662 self._loop.call_soon(self._loop._add_reader, 

663 self._fileno, self._read_ready) 

664 

665 if waiter is not None: 

666 # only wake up the waiter when connection_made() has been called 

667 self._loop.call_soon(futures._set_result_unless_cancelled, 

668 waiter, None) 

669 

670 def __repr__(self): 

671 info = [self.__class__.__name__] 

672 if self._pipe is None: 672 ↛ 673line 672 didn't jump to line 673 because the condition on line 672 was never true

673 info.append('closed') 

674 elif self._closing: 674 ↛ 675line 674 didn't jump to line 675 because the condition on line 674 was never true

675 info.append('closing') 

676 info.append(f'fd={self._fileno}') 

677 selector = getattr(self._loop, '_selector', None) 

678 if self._pipe is not None and selector is not None: 678 ↛ 679line 678 didn't jump to line 679 because the condition on line 678 was never true

679 polling = selector_events._test_selector_event( 

680 selector, self._fileno, selectors.EVENT_WRITE) 

681 if polling: 

682 info.append('polling') 

683 else: 

684 info.append('idle') 

685 

686 bufsize = self.get_write_buffer_size() 

687 info.append(f'bufsize={bufsize}') 

688 elif self._pipe is not None: 688 ↛ 691line 688 didn't jump to line 691 because the condition on line 688 was always true

689 info.append('open') 

690 else: 

691 info.append('closed') 

692 return '<{}>'.format(' '.join(info)) 

693 

694 def get_write_buffer_size(self): 

695 return len(self._buffer) 

696 

697 def _read_ready(self): 

698 # Pipe was closed by peer. 

699 if self._loop.get_debug(): 699 ↛ 700line 699 didn't jump to line 700 because the condition on line 699 was never true

700 logger.info("%r was closed by peer", self) 

701 if self._buffer: 

702 self._close(BrokenPipeError()) 

703 else: 

704 self._close() 

705 

706 def write(self, data): 

707 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data) 

708 if isinstance(data, bytearray): 708 ↛ 709line 708 didn't jump to line 709 because the condition on line 708 was never true

709 data = memoryview(data) 

710 if not data: 

711 return 

712 

713 if self._conn_lost or self._closing: 

714 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 

715 logger.warning('pipe closed by peer or ' 

716 'os.write(pipe, data) raised exception.') 

717 self._conn_lost += 1 

718 return 

719 

720 if not self._buffer: 

721 # Attempt to send it right away first. 

722 try: 

723 n = os.write(self._fileno, data) 

724 except (BlockingIOError, InterruptedError): 

725 n = 0 

726 except (SystemExit, KeyboardInterrupt): 

727 raise 

728 except BaseException as exc: 

729 self._conn_lost += 1 

730 self._fatal_error(exc, 'Fatal write error on pipe transport') 

731 return 

732 if n == len(data): 

733 return 

734 elif n > 0: 

735 data = memoryview(data)[n:] 

736 self._loop._add_writer(self._fileno, self._write_ready) 

737 

738 self._buffer += data 

739 self._maybe_pause_protocol() 

740 

741 def _write_ready(self): 

742 assert self._buffer, 'Data should not be empty' 

743 

744 try: 

745 n = os.write(self._fileno, self._buffer) 

746 except (BlockingIOError, InterruptedError): 

747 pass 

748 except (SystemExit, KeyboardInterrupt): 

749 raise 

750 except BaseException as exc: 

751 self._buffer.clear() 

752 self._conn_lost += 1 

753 # Remove writer here, _fatal_error() doesn't it 

754 # because _buffer is empty. 

755 self._loop._remove_writer(self._fileno) 

756 self._fatal_error(exc, 'Fatal write error on pipe transport') 

757 else: 

758 if n == len(self._buffer): 

759 self._buffer.clear() 

760 self._loop._remove_writer(self._fileno) 

761 self._maybe_resume_protocol() # May append to buffer. 

762 if self._closing: 

763 self._loop._remove_reader(self._fileno) 

764 self._call_connection_lost(None) 

765 return 

766 elif n > 0: 

767 del self._buffer[:n] 

768 

769 def can_write_eof(self): 

770 return True 

771 

772 def write_eof(self): 

773 if self._closing: 773 ↛ 774line 773 didn't jump to line 774 because the condition on line 773 was never true

774 return 

775 assert self._pipe 

776 self._closing = True 

777 if not self._buffer: 

778 self._loop._remove_reader(self._fileno) 

779 self._loop.call_soon(self._call_connection_lost, None) 

780 

781 def set_protocol(self, protocol): 

782 self._protocol = protocol 

783 

784 def get_protocol(self): 

785 return self._protocol 

786 

787 def is_closing(self): 

788 return self._closing 

789 

790 def close(self): 

791 if self._pipe is not None and not self._closing: 

792 # write_eof is all what we needed to close the write pipe 

793 self.write_eof() 

794 

795 def __del__(self, _warn=warnings.warn): 

796 if self._pipe is not None: 796 ↛ 797line 796 didn't jump to line 797 because the condition on line 796 was never true

797 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 

798 self._pipe.close() 

799 

800 def abort(self): 

801 self._close(None) 

802 

803 def _fatal_error(self, exc, message='Fatal error on pipe transport'): 

804 # should be called by exception handler only 

805 if isinstance(exc, OSError): 805 ↛ 809line 805 didn't jump to line 809 because the condition on line 805 was always true

806 if self._loop.get_debug(): 806 ↛ 807line 806 didn't jump to line 807 because the condition on line 806 was never true

807 logger.debug("%r: %s", self, message, exc_info=True) 

808 else: 

809 self._loop.call_exception_handler({ 

810 'message': message, 

811 'exception': exc, 

812 'transport': self, 

813 'protocol': self._protocol, 

814 }) 

815 self._close(exc) 

816 

817 def _close(self, exc=None): 

818 self._closing = True 

819 if self._buffer: 

820 self._loop._remove_writer(self._fileno) 

821 self._buffer.clear() 

822 self._loop._remove_reader(self._fileno) 

823 self._loop.call_soon(self._call_connection_lost, exc) 

824 

825 def _call_connection_lost(self, exc): 

826 try: 

827 self._protocol.connection_lost(exc) 

828 finally: 

829 self._pipe.close() 

830 self._pipe = None 

831 self._protocol = None 

832 self._loop = None 

833 

834 

835class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport): 

836 

837 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 

838 stdin_w = None 

839 if (stdin == subprocess.PIPE 839 ↛ 845line 839 didn't jump to line 845 because the condition on line 839 was never true

840 and (sys.platform.startswith('aix') or sys.platform == 'cygwin')): 

841 # Use a socket pair for stdin on AIX, since it does not 

842 # support selecting read events on the write end of a 

843 # socket (which we use in order to detect closing of the 

844 # other end). 

845 stdin, stdin_w = socket.socketpair() 

846 try: 

847 self._proc = subprocess.Popen( 

848 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 

849 universal_newlines=False, bufsize=bufsize, **kwargs) 

850 if stdin_w is not None: 850 ↛ 851line 850 didn't jump to line 851 because the condition on line 850 was never true

851 stdin.close() 

852 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize) 

853 stdin_w = None 

854 finally: 

855 if stdin_w is not None: 855 ↛ 856line 855 didn't jump to line 856 because the condition on line 855 was never true

856 stdin.close() 

857 stdin_w.close() 

858 

859 

860class _PidfdChildWatcher: 

861 """Child watcher implementation using Linux's pid file descriptors. 

862 

863 This child watcher polls process file descriptors (pidfds) to await child 

864 process termination. In some respects, PidfdChildWatcher is a "Goldilocks" 

865 child watcher implementation. It doesn't require signals or threads, doesn't 

866 interfere with any processes launched outside the event loop, and scales 

867 linearly with the number of subprocesses launched by the event loop. The 

868 main disadvantage is that pidfds are specific to Linux, and only work on 

869 recent (5.3+) kernels. 

870 """ 

871 

872 def add_child_handler(self, pid, callback, *args): 

873 loop = events.get_running_loop() 

874 pidfd = os.pidfd_open(pid) 

875 loop._add_reader(pidfd, self._do_wait, pid, pidfd, callback, args) 

876 

877 def _do_wait(self, pid, pidfd, callback, args): 

878 loop = events.get_running_loop() 

879 loop._remove_reader(pidfd) 

880 try: 

881 _, status = os.waitpid(pid, 0) 

882 except ChildProcessError: 

883 # The child process is already reaped 

884 # (may happen if waitpid() is called elsewhere). 

885 returncode = 255 

886 logger.warning( 

887 "child process pid %d exit status already read: " 

888 " will report returncode 255", 

889 pid) 

890 else: 

891 returncode = waitstatus_to_exitcode(status) 

892 finally: 

893 os.close(pidfd) 

894 callback(pid, returncode, *args) 

895 

896class _ThreadedChildWatcher: 

897 """Threaded child watcher implementation. 

898 

899 The watcher uses a thread per process 

900 for waiting for the process finish. 

901 

902 It doesn't require subscription on POSIX signal 

903 but a thread creation is not free. 

904 

905 The watcher has O(1) complexity, its performance doesn't depend 

906 on amount of spawn processes. 

907 """ 

908 

909 def __init__(self): 

910 self._pid_counter = itertools.count(0) 

911 self._threads = {} 

912 

913 def __del__(self, _warn=warnings.warn): 

914 threads = [thread for thread in list(self._threads.values()) 

915 if thread.is_alive()] 

916 if threads: 916 ↛ 917line 916 didn't jump to line 917 because the condition on line 916 was never true

917 _warn(f"{self.__class__} has registered but not finished child processes", 

918 ResourceWarning, 

919 source=self) 

920 

921 def add_child_handler(self, pid, callback, *args): 

922 loop = events.get_running_loop() 

923 thread = threading.Thread(target=self._do_waitpid, 

924 name=f"asyncio-waitpid-{next(self._pid_counter)}", 

925 args=(loop, pid, callback, args), 

926 daemon=True) 

927 self._threads[pid] = thread 

928 thread.start() 

929 

930 def _do_waitpid(self, loop, expected_pid, callback, args): 

931 assert expected_pid > 0 

932 

933 try: 

934 pid, status = os.waitpid(expected_pid, 0) 

935 except ChildProcessError: 

936 # The child process is already reaped 

937 # (may happen if waitpid() is called elsewhere). 

938 pid = expected_pid 

939 returncode = 255 

940 logger.warning( 

941 "Unknown child process pid %d, will report returncode 255", 

942 pid) 

943 else: 

944 returncode = waitstatus_to_exitcode(status) 

945 if loop.get_debug(): 945 ↛ 946line 945 didn't jump to line 946 because the condition on line 945 was never true

946 logger.debug('process %s exited with returncode %s', 

947 expected_pid, returncode) 

948 

949 if loop.is_closed(): 949 ↛ 950line 949 didn't jump to line 950 because the condition on line 949 was never true

950 logger.warning("Loop %r that handles pid %r is closed", loop, pid) 

951 else: 

952 loop.call_soon_threadsafe(callback, pid, returncode, *args) 

953 

954 self._threads.pop(expected_pid) 

955 

956def can_use_pidfd(): 

957 if not hasattr(os, 'pidfd_open'): 957 ↛ 958line 957 didn't jump to line 958 because the condition on line 957 was never true

958 return False 

959 try: 

960 pid = os.getpid() 

961 os.close(os.pidfd_open(pid, 0)) 

962 except OSError: 

963 # blocked by security policy like SECCOMP 

964 return False 

965 return True 

966 

967 

968class _UnixDefaultEventLoopPolicy(events._BaseDefaultEventLoopPolicy): 

969 """UNIX event loop policy""" 

970 _loop_factory = _UnixSelectorEventLoop 

971 

972 

973SelectorEventLoop = _UnixSelectorEventLoop 

974_DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy 

975EventLoop = SelectorEventLoop