Coverage for Lib/asyncio/unix_events.py: 87%

619 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-23 01:21 +0000

1"""Selector event loop for Unix with signal handling.""" 

2 

3import errno 

4import io 

5import itertools 

6import os 

7import selectors 

8import signal 

9import socket 

10import stat 

11import subprocess 

12import sys 

13import threading 

14import warnings 

15 

16from . import base_events 

17from . import base_subprocess 

18from . import constants 

19from . import coroutines 

20from . import events 

21from . import exceptions 

22from . import futures 

23from . import selector_events 

24from . import tasks 

25from . import transports 

26from .log import logger 

27 

28 

29__all__ = ( 

30 'SelectorEventLoop', 

31 'EventLoop', 

32) 

33 

34 

35if sys.platform == 'win32': # pragma: no cover 

36 raise ImportError('Signals are not really supported on Windows') 

37 

38 

39def _sighandler_noop(signum, frame): 

40 """Dummy signal handler.""" 

41 pass 

42 

43 

44def waitstatus_to_exitcode(status): 

45 try: 

46 return os.waitstatus_to_exitcode(status) 

47 except ValueError: 

48 # The child exited, but we don't understand its status. 

49 # This shouldn't happen, but if it does, let's just 

50 # return that status; perhaps that helps debug it. 

51 return status 

52 

53 

54class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): 

55 """Unix event loop. 

56 

57 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop. 

58 """ 

59 

60 def __init__(self, selector=None): 

61 super().__init__(selector) 

62 self._signal_handlers = {} 

63 self._unix_server_sockets = {} 

64 if can_use_pidfd(): 

65 self._watcher = _PidfdChildWatcher() 

66 else: 

67 self._watcher = _ThreadedChildWatcher() 

68 

69 def close(self): 

70 super().close() 

71 if not sys.is_finalizing(): 

72 for sig in list(self._signal_handlers): 

73 self.remove_signal_handler(sig) 

74 else: 

75 if self._signal_handlers: 75 ↛ exitline 75 didn't return from function 'close' because the condition on line 75 was always true

76 warnings.warn(f"Closing the loop {self!r} " 

77 f"on interpreter shutdown " 

78 f"stage, skipping signal handlers removal", 

79 ResourceWarning, 

80 source=self) 

81 self._signal_handlers.clear() 

82 

83 def _process_self_data(self, data): 

84 for signum in data: 

85 if not signum: 

86 # ignore null bytes written by _write_to_self() 

87 continue 

88 self._handle_signal(signum) 

89 

90 def add_signal_handler(self, sig, callback, *args): 

91 """Add a handler for a signal. UNIX only. 

92 

93 Raise ValueError if the signal number is invalid or uncatchable. 

94 Raise RuntimeError if there is a problem setting up the handler. 

95 """ 

96 if (coroutines.iscoroutine(callback) or 

97 coroutines._iscoroutinefunction(callback)): 

98 raise TypeError("coroutines cannot be used " 

99 "with add_signal_handler()") 

100 self._check_signal(sig) 

101 self._check_closed() 

102 try: 

103 # set_wakeup_fd() raises ValueError if this is not the 

104 # main thread. By calling it early we ensure that an 

105 # event loop running in another thread cannot add a signal 

106 # handler. 

107 signal.set_wakeup_fd(self._csock.fileno()) 

108 except (ValueError, OSError) as exc: 

109 raise RuntimeError(str(exc)) 

110 

111 handle = events.Handle(callback, args, self, None) 

112 self._signal_handlers[sig] = handle 

113 

114 try: 

115 # Register a dummy signal handler to ask Python to write the signal 

116 # number in the wakeup file descriptor. _process_self_data() will 

117 # read signal numbers from this file descriptor to handle signals. 

118 signal.signal(sig, _sighandler_noop) 

119 

120 # Set SA_RESTART to limit EINTR occurrences. 

121 signal.siginterrupt(sig, False) 

122 except OSError as exc: 

123 del self._signal_handlers[sig] 

124 if not self._signal_handlers: 

125 try: 

126 signal.set_wakeup_fd(-1) 

127 except (ValueError, OSError) as nexc: 

128 logger.info('set_wakeup_fd(-1) failed: %s', nexc) 

129 

130 if exc.errno == errno.EINVAL: 

131 raise RuntimeError(f'sig {sig} cannot be caught') 

132 else: 

133 raise 

134 

135 def _handle_signal(self, sig): 

136 """Internal helper that is the actual signal handler.""" 

137 handle = self._signal_handlers.get(sig) 

138 if handle is None: 

139 return # Assume it's some race condition. 

140 if handle._cancelled: 

141 self.remove_signal_handler(sig) # Remove it properly. 

142 else: 

143 self._add_callback_signalsafe(handle) 

144 

145 def remove_signal_handler(self, sig): 

146 """Remove a handler for a signal. UNIX only. 

147 

148 Return True if a signal handler was removed, False if not. 

149 """ 

150 self._check_signal(sig) 

151 try: 

152 del self._signal_handlers[sig] 

153 except KeyError: 

154 return False 

155 

156 if sig == signal.SIGINT: 

157 handler = signal.default_int_handler 

158 else: 

159 handler = signal.SIG_DFL 

160 

161 try: 

162 signal.signal(sig, handler) 

163 except OSError as exc: 

164 if exc.errno == errno.EINVAL: 

165 raise RuntimeError(f'sig {sig} cannot be caught') 

166 else: 

167 raise 

168 

169 if not self._signal_handlers: 

170 try: 

171 signal.set_wakeup_fd(-1) 

172 except (ValueError, OSError) as exc: 

173 logger.info('set_wakeup_fd(-1) failed: %s', exc) 

174 

175 return True 

176 

177 def _check_signal(self, sig): 

178 """Internal helper to validate a signal. 

179 

180 Raise ValueError if the signal number is invalid or uncatchable. 

181 Raise RuntimeError if there is a problem setting up the handler. 

182 """ 

183 if not isinstance(sig, int): 

184 raise TypeError(f'sig must be an int, not {sig!r}') 

185 

186 if sig not in signal.valid_signals(): 

187 raise ValueError(f'invalid signal number {sig}') 

188 

189 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 

190 extra=None): 

191 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra) 

192 

193 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 

194 extra=None): 

195 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra) 

196 

197 async def _make_subprocess_transport(self, protocol, args, shell, 

198 stdin, stdout, stderr, bufsize, 

199 extra=None, **kwargs): 

200 watcher = self._watcher 

201 waiter = self.create_future() 

202 transp = _UnixSubprocessTransport(self, protocol, args, shell, 

203 stdin, stdout, stderr, bufsize, 

204 waiter=waiter, extra=extra, 

205 **kwargs) 

206 watcher.add_child_handler(transp.get_pid(), 

207 self._child_watcher_callback, transp) 

208 try: 

209 await waiter 

210 except (SystemExit, KeyboardInterrupt): 

211 raise 

212 except BaseException: 

213 transp.close() 

214 await transp._wait() 

215 raise 

216 

217 return transp 

218 

219 def _child_watcher_callback(self, pid, returncode, transp): 

220 self.call_soon_threadsafe(transp._process_exited, returncode) 

221 

222 async def create_unix_connection( 

223 self, protocol_factory, path=None, *, 

224 ssl=None, sock=None, 

225 server_hostname=None, 

226 ssl_handshake_timeout=None, 

227 ssl_shutdown_timeout=None): 

228 assert server_hostname is None or isinstance(server_hostname, str) 

229 if ssl: 

230 if server_hostname is None: 

231 raise ValueError( 

232 'you have to pass server_hostname when using ssl') 

233 else: 

234 if server_hostname is not None: 

235 raise ValueError('server_hostname is only meaningful with ssl') 

236 if ssl_handshake_timeout is not None: 

237 raise ValueError( 

238 'ssl_handshake_timeout is only meaningful with ssl') 

239 if ssl_shutdown_timeout is not None: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true

240 raise ValueError( 

241 'ssl_shutdown_timeout is only meaningful with ssl') 

242 

243 if path is not None: 

244 if sock is not None: 

245 raise ValueError( 

246 'path and sock can not be specified at the same time') 

247 

248 path = os.fspath(path) 

249 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) 

250 try: 

251 sock.setblocking(False) 

252 await self.sock_connect(sock, path) 

253 except: 

254 sock.close() 

255 raise 

256 

257 else: 

258 if sock is None: 

259 raise ValueError('no path and sock were specified') 

260 if (sock.family != socket.AF_UNIX or 260 ↛ 264line 260 didn't jump to line 264 because the condition on line 260 was always true

261 sock.type != socket.SOCK_STREAM): 

262 raise ValueError( 

263 f'A UNIX Domain Stream Socket was expected, got {sock!r}') 

264 sock.setblocking(False) 

265 

266 transport, protocol = await self._create_connection_transport( 

267 sock, protocol_factory, ssl, server_hostname, 

268 ssl_handshake_timeout=ssl_handshake_timeout, 

269 ssl_shutdown_timeout=ssl_shutdown_timeout) 

270 return transport, protocol 

271 

272 async def create_unix_server( 

273 self, protocol_factory, path=None, *, 

274 sock=None, backlog=100, ssl=None, 

275 ssl_handshake_timeout=None, 

276 ssl_shutdown_timeout=None, 

277 start_serving=True, cleanup_socket=True): 

278 if isinstance(ssl, bool): 

279 raise TypeError('ssl argument must be an SSLContext or None') 

280 

281 if ssl_handshake_timeout is not None and not ssl: 

282 raise ValueError( 

283 'ssl_handshake_timeout is only meaningful with ssl') 

284 

285 if ssl_shutdown_timeout is not None and not ssl: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true

286 raise ValueError( 

287 'ssl_shutdown_timeout is only meaningful with ssl') 

288 

289 if path is not None: 

290 if sock is not None: 

291 raise ValueError( 

292 'path and sock can not be specified at the same time') 

293 

294 path = os.fspath(path) 

295 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 

296 

297 # Check for abstract socket. `str` and `bytes` paths are supported. 

298 if path[0] not in (0, '\x00'): 298 ↛ 309line 298 didn't jump to line 309 because the condition on line 298 was always true

299 try: 

300 if stat.S_ISSOCK(os.stat(path).st_mode): 

301 os.remove(path) 

302 except FileNotFoundError: 

303 pass 

304 except OSError as err: 

305 # Directory may have permissions only to create socket. 

306 logger.error('Unable to check or remove stale UNIX socket ' 

307 '%r: %r', path, err) 

308 

309 try: 

310 sock.bind(path) 

311 except OSError as exc: 

312 sock.close() 

313 if exc.errno == errno.EADDRINUSE: 

314 # Let's improve the error message by adding 

315 # with what exact address it occurs. 

316 msg = f'Address {path!r} is already in use' 

317 raise OSError(errno.EADDRINUSE, msg) from None 

318 else: 

319 raise 

320 except: 

321 sock.close() 

322 raise 

323 else: 

324 if sock is None: 

325 raise ValueError( 

326 'path was not specified, and no sock specified') 

327 

328 if (sock.family != socket.AF_UNIX or 

329 sock.type != socket.SOCK_STREAM): 

330 raise ValueError( 

331 f'A UNIX Domain Stream Socket was expected, got {sock!r}') 

332 

333 if cleanup_socket: 

334 path = sock.getsockname() 

335 # Check for abstract socket. `str` and `bytes` paths are supported. 

336 if path[0] not in (0, '\x00'): 336 ↛ 342line 336 didn't jump to line 342 because the condition on line 336 was always true

337 try: 

338 self._unix_server_sockets[sock] = os.stat(path).st_ino 

339 except FileNotFoundError: 

340 pass 

341 

342 sock.setblocking(False) 

343 server = base_events.Server(self, [sock], protocol_factory, 

344 ssl, backlog, ssl_handshake_timeout, 

345 ssl_shutdown_timeout) 

346 if start_serving: 

347 server._start_serving() 

348 # Skip one loop iteration so that all 'loop.add_reader' 

349 # go through. 

350 await tasks.sleep(0) 

351 

352 return server 

353 

354 async def _sock_sendfile_native(self, sock, file, offset, count): 

355 try: 

356 os.sendfile 

357 except AttributeError: 

358 raise exceptions.SendfileNotAvailableError( 

359 "os.sendfile() is not available") 

360 try: 

361 fileno = file.fileno() 

362 except (AttributeError, io.UnsupportedOperation) as err: 

363 raise exceptions.SendfileNotAvailableError("not a regular file") 

364 try: 

365 fsize = os.fstat(fileno).st_size 

366 except OSError: 

367 raise exceptions.SendfileNotAvailableError("not a regular file") 

368 blocksize = count if count else fsize 

369 if not blocksize: 

370 return 0 # empty file 

371 

372 fut = self.create_future() 

373 self._sock_sendfile_native_impl(fut, None, sock, fileno, 

374 offset, count, blocksize, 0) 

375 return await fut 

376 

377 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno, 

378 offset, count, blocksize, total_sent): 

379 fd = sock.fileno() 

380 if registered_fd is not None: 

381 # Remove the callback early. It should be rare that the 

382 # selector says the fd is ready but the call still returns 

383 # EAGAIN, and I am willing to take a hit in that case in 

384 # order to simplify the common case. 

385 self.remove_writer(registered_fd) 

386 if fut.cancelled(): 

387 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 

388 return 

389 if count: 

390 blocksize = count - total_sent 

391 if blocksize <= 0: 

392 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 

393 fut.set_result(total_sent) 

394 return 

395 

396 # On 32-bit architectures truncate to 1GiB to avoid OverflowError 

397 blocksize = min(blocksize, sys.maxsize//2 + 1) 

398 

399 try: 

400 sent = os.sendfile(fd, fileno, offset, blocksize) 

401 except (BlockingIOError, InterruptedError): 

402 if registered_fd is None: 402 ↛ 404line 402 didn't jump to line 404 because the condition on line 402 was always true

403 self._sock_add_cancellation_callback(fut, sock) 

404 self.add_writer(fd, self._sock_sendfile_native_impl, fut, 

405 fd, sock, fileno, 

406 offset, count, blocksize, total_sent) 

407 except OSError as exc: 

408 if (registered_fd is not None and 408 ↛ 415line 408 didn't jump to line 415 because the condition on line 408 was never true

409 exc.errno == errno.ENOTCONN and 

410 type(exc) is not ConnectionError): 

411 # If we have an ENOTCONN and this isn't a first call to 

412 # sendfile(), i.e. the connection was closed in the middle 

413 # of the operation, normalize the error to ConnectionError 

414 # to make it consistent across all Posix systems. 

415 new_exc = ConnectionError( 

416 "socket is not connected", errno.ENOTCONN) 

417 new_exc.__cause__ = exc 

418 exc = new_exc 

419 if total_sent == 0: 

420 # We can get here for different reasons, the main 

421 # one being 'file' is not a regular mmap(2)-like 

422 # file, in which case we'll fall back on using 

423 # plain send(). 

424 err = exceptions.SendfileNotAvailableError( 

425 "os.sendfile call failed") 

426 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 

427 fut.set_exception(err) 

428 else: 

429 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 

430 fut.set_exception(exc) 

431 except (SystemExit, KeyboardInterrupt): 

432 raise 

433 except BaseException as exc: 

434 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 

435 fut.set_exception(exc) 

436 else: 

437 if sent == 0: 

438 # EOF 

439 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 

440 fut.set_result(total_sent) 

441 else: 

442 offset += sent 

443 total_sent += sent 

444 if registered_fd is None: 

445 self._sock_add_cancellation_callback(fut, sock) 

446 self.add_writer(fd, self._sock_sendfile_native_impl, fut, 

447 fd, sock, fileno, 

448 offset, count, blocksize, total_sent) 

449 

450 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent): 

451 if total_sent > 0: 

452 os.lseek(fileno, offset, os.SEEK_SET) 

453 

454 def _sock_add_cancellation_callback(self, fut, sock): 

455 def cb(fut): 

456 if fut.cancelled(): 

457 fd = sock.fileno() 

458 if fd != -1: 458 ↛ exitline 458 didn't return from function 'cb' because the condition on line 458 was always true

459 self.remove_writer(fd) 

460 fut.add_done_callback(cb) 

461 

462 def _stop_serving(self, sock): 

463 # Is this a unix socket that needs cleanup? 

464 if sock in self._unix_server_sockets: 

465 path = sock.getsockname() 

466 else: 

467 path = None 

468 

469 super()._stop_serving(sock) 

470 

471 if path is not None: 

472 prev_ino = self._unix_server_sockets[sock] 

473 del self._unix_server_sockets[sock] 

474 try: 

475 if os.stat(path).st_ino == prev_ino: 

476 os.unlink(path) 

477 except FileNotFoundError: 

478 pass 

479 except OSError as err: 

480 logger.error('Unable to clean up listening UNIX socket ' 

481 '%r: %r', path, err) 

482 

483 

484class _UnixReadPipeTransport(transports.ReadTransport): 

485 

486 max_size = 256 * 1024 # max bytes we read in one event loop iteration 

487 

488 def __init__(self, loop, pipe, protocol, waiter=None, extra=None): 

489 super().__init__(extra) 

490 self._extra['pipe'] = pipe 

491 self._loop = loop 

492 self._pipe = pipe 

493 self._fileno = pipe.fileno() 

494 self._protocol = protocol 

495 self._closing = False 

496 self._paused = False 

497 

498 mode = os.fstat(self._fileno).st_mode 

499 if not (stat.S_ISFIFO(mode) or 499 ↛ 502line 499 didn't jump to line 502 because the condition on line 499 was never true

500 stat.S_ISSOCK(mode) or 

501 stat.S_ISCHR(mode)): 

502 self._pipe = None 

503 self._fileno = None 

504 self._protocol = None 

505 raise ValueError("Pipe transport is for pipes/sockets only.") 

506 

507 os.set_blocking(self._fileno, False) 

508 

509 self._loop.call_soon(self._protocol.connection_made, self) 

510 # only start reading when connection_made() has been called 

511 self._loop.call_soon(self._add_reader, 

512 self._fileno, self._read_ready) 

513 if waiter is not None: 

514 # only wake up the waiter when connection_made() has been called 

515 self._loop.call_soon(futures._set_result_unless_cancelled, 

516 waiter, None) 

517 

518 def _add_reader(self, fd, callback): 

519 if not self.is_reading(): 

520 return 

521 self._loop._add_reader(fd, callback) 

522 

523 def is_reading(self): 

524 return not self._paused and not self._closing 

525 

526 def __repr__(self): 

527 info = [self.__class__.__name__] 

528 if self._pipe is None: 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true

529 info.append('closed') 

530 elif self._closing: 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true

531 info.append('closing') 

532 info.append(f'fd={self._fileno}') 

533 selector = getattr(self._loop, '_selector', None) 

534 if self._pipe is not None and selector is not None: 

535 polling = selector_events._test_selector_event( 

536 selector, self._fileno, selectors.EVENT_READ) 

537 if polling: 537 ↛ 538line 537 didn't jump to line 538 because the condition on line 537 was never true

538 info.append('polling') 

539 else: 

540 info.append('idle') 

541 elif self._pipe is not None: 541 ↛ 544line 541 didn't jump to line 544 because the condition on line 541 was always true

542 info.append('open') 

543 else: 

544 info.append('closed') 

545 return '<{}>'.format(' '.join(info)) 

546 

547 def _read_ready(self): 

548 try: 

549 data = os.read(self._fileno, self.max_size) 

550 except (BlockingIOError, InterruptedError): 

551 pass 

552 except OSError as exc: 

553 self._fatal_error(exc, 'Fatal read error on pipe transport') 

554 else: 

555 if data: 

556 self._protocol.data_received(data) 

557 else: 

558 if self._loop.get_debug(): 558 ↛ 559line 558 didn't jump to line 559 because the condition on line 558 was never true

559 logger.info("%r was closed by peer", self) 

560 self._closing = True 

561 self._loop._remove_reader(self._fileno) 

562 self._loop.call_soon(self._protocol.eof_received) 

563 self._loop.call_soon(self._call_connection_lost, None) 

564 

565 def pause_reading(self): 

566 if not self.is_reading(): 

567 return 

568 self._paused = True 

569 self._loop._remove_reader(self._fileno) 

570 if self._loop.get_debug(): 570 ↛ 571line 570 didn't jump to line 571 because the condition on line 570 was never true

571 logger.debug("%r pauses reading", self) 

572 

573 def resume_reading(self): 

574 if self._closing or not self._paused: 

575 return 

576 self._paused = False 

577 self._loop._add_reader(self._fileno, self._read_ready) 

578 if self._loop.get_debug(): 578 ↛ 579line 578 didn't jump to line 579 because the condition on line 578 was never true

579 logger.debug("%r resumes reading", self) 

580 

581 def set_protocol(self, protocol): 

582 self._protocol = protocol 

583 

584 def get_protocol(self): 

585 return self._protocol 

586 

587 def is_closing(self): 

588 return self._closing 

589 

590 def close(self): 

591 if not self._closing: 

592 self._close(None) 

593 

594 def __del__(self, _warn=warnings.warn): 

595 if self._pipe is not None: 595 ↛ 596line 595 didn't jump to line 596 because the condition on line 595 was never true

596 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 

597 self._pipe.close() 

598 

599 def _fatal_error(self, exc, message='Fatal error on pipe transport'): 

600 # should be called by exception handler only 

601 if (isinstance(exc, OSError) and exc.errno == errno.EIO): 601 ↛ 602line 601 didn't jump to line 602 because the condition on line 601 was never true

602 if self._loop.get_debug(): 

603 logger.debug("%r: %s", self, message, exc_info=True) 

604 else: 

605 self._loop.call_exception_handler({ 

606 'message': message, 

607 'exception': exc, 

608 'transport': self, 

609 'protocol': self._protocol, 

610 }) 

611 self._close(exc) 

612 

613 def _close(self, exc): 

614 self._closing = True 

615 self._loop._remove_reader(self._fileno) 

616 self._loop.call_soon(self._call_connection_lost, exc) 

617 

618 def _call_connection_lost(self, exc): 

619 try: 

620 self._protocol.connection_lost(exc) 

621 finally: 

622 self._pipe.close() 

623 self._pipe = None 

624 self._protocol = None 

625 self._loop = None 

626 

627 

628class _UnixWritePipeTransport(transports._FlowControlMixin, 

629 transports.WriteTransport): 

630 

631 def __init__(self, loop, pipe, protocol, waiter=None, extra=None): 

632 super().__init__(extra, loop) 

633 self._extra['pipe'] = pipe 

634 self._pipe = pipe 

635 self._fileno = pipe.fileno() 

636 self._protocol = protocol 

637 self._buffer = bytearray() 

638 self._conn_lost = 0 

639 self._closing = False # Set when close() or write_eof() called. 

640 

641 mode = os.fstat(self._fileno).st_mode 

642 is_char = stat.S_ISCHR(mode) 

643 is_fifo = stat.S_ISFIFO(mode) 

644 is_socket = stat.S_ISSOCK(mode) 

645 if not (is_char or is_fifo or is_socket): 645 ↛ 646line 645 didn't jump to line 646 because the condition on line 645 was never true

646 self._pipe = None 

647 self._fileno = None 

648 self._protocol = None 

649 raise ValueError("Pipe transport is only for " 

650 "pipes, sockets and character devices") 

651 

652 os.set_blocking(self._fileno, False) 

653 self._loop.call_soon(self._protocol.connection_made, self) 

654 

655 # On AIX, the reader trick (to be notified when the read end of the 

656 # socket is closed) only works for sockets. On other platforms it 

657 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) 

658 if is_socket or (is_fifo and not sys.platform.startswith("aix")): 

659 # only start reading when connection_made() has been called 

660 self._loop.call_soon(self._loop._add_reader, 

661 self._fileno, self._read_ready) 

662 

663 if waiter is not None: 

664 # only wake up the waiter when connection_made() has been called 

665 self._loop.call_soon(futures._set_result_unless_cancelled, 

666 waiter, None) 

667 

668 def __repr__(self): 

669 info = [self.__class__.__name__] 

670 if self._pipe is None: 670 ↛ 671line 670 didn't jump to line 671 because the condition on line 670 was never true

671 info.append('closed') 

672 elif self._closing: 672 ↛ 673line 672 didn't jump to line 673 because the condition on line 672 was never true

673 info.append('closing') 

674 info.append(f'fd={self._fileno}') 

675 selector = getattr(self._loop, '_selector', None) 

676 if self._pipe is not None and selector is not None: 676 ↛ 677line 676 didn't jump to line 677 because the condition on line 676 was never true

677 polling = selector_events._test_selector_event( 

678 selector, self._fileno, selectors.EVENT_WRITE) 

679 if polling: 

680 info.append('polling') 

681 else: 

682 info.append('idle') 

683 

684 bufsize = self.get_write_buffer_size() 

685 info.append(f'bufsize={bufsize}') 

686 elif self._pipe is not None: 686 ↛ 689line 686 didn't jump to line 689 because the condition on line 686 was always true

687 info.append('open') 

688 else: 

689 info.append('closed') 

690 return '<{}>'.format(' '.join(info)) 

691 

692 def get_write_buffer_size(self): 

693 return len(self._buffer) 

694 

695 def _read_ready(self): 

696 # Pipe was closed by peer. 

697 if self._loop.get_debug(): 697 ↛ 698line 697 didn't jump to line 698 because the condition on line 697 was never true

698 logger.info("%r was closed by peer", self) 

699 if self._buffer: 

700 self._close(BrokenPipeError()) 

701 else: 

702 self._close() 

703 

704 def write(self, data): 

705 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data) 

706 if isinstance(data, bytearray): 706 ↛ 707line 706 didn't jump to line 707 because the condition on line 706 was never true

707 data = memoryview(data) 

708 if not data: 

709 return 

710 

711 if self._conn_lost or self._closing: 

712 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 

713 logger.warning('pipe closed by peer or ' 

714 'os.write(pipe, data) raised exception.') 

715 self._conn_lost += 1 

716 return 

717 

718 if not self._buffer: 

719 # Attempt to send it right away first. 

720 try: 

721 n = os.write(self._fileno, data) 

722 except (BlockingIOError, InterruptedError): 

723 n = 0 

724 except (SystemExit, KeyboardInterrupt): 

725 raise 

726 except BaseException as exc: 

727 self._conn_lost += 1 

728 self._fatal_error(exc, 'Fatal write error on pipe transport') 

729 return 

730 if n == len(data): 

731 return 

732 elif n > 0: 

733 data = memoryview(data)[n:] 

734 self._loop._add_writer(self._fileno, self._write_ready) 

735 

736 self._buffer += data 

737 self._maybe_pause_protocol() 

738 

739 def _write_ready(self): 

740 assert self._buffer, 'Data should not be empty' 

741 

742 try: 

743 n = os.write(self._fileno, self._buffer) 

744 except (BlockingIOError, InterruptedError): 

745 pass 

746 except (SystemExit, KeyboardInterrupt): 

747 raise 

748 except BaseException as exc: 

749 self._buffer.clear() 

750 self._conn_lost += 1 

751 # Remove writer here, _fatal_error() doesn't it 

752 # because _buffer is empty. 

753 self._loop._remove_writer(self._fileno) 

754 self._fatal_error(exc, 'Fatal write error on pipe transport') 

755 else: 

756 if n == len(self._buffer): 

757 self._buffer.clear() 

758 self._loop._remove_writer(self._fileno) 

759 self._maybe_resume_protocol() # May append to buffer. 

760 if self._closing: 

761 self._loop._remove_reader(self._fileno) 

762 self._call_connection_lost(None) 

763 return 

764 elif n > 0: 

765 del self._buffer[:n] 

766 

767 def can_write_eof(self): 

768 return True 

769 

770 def write_eof(self): 

771 if self._closing: 771 ↛ 772line 771 didn't jump to line 772 because the condition on line 771 was never true

772 return 

773 assert self._pipe 

774 self._closing = True 

775 if not self._buffer: 

776 self._loop._remove_reader(self._fileno) 

777 self._loop.call_soon(self._call_connection_lost, None) 

778 

779 def set_protocol(self, protocol): 

780 self._protocol = protocol 

781 

782 def get_protocol(self): 

783 return self._protocol 

784 

785 def is_closing(self): 

786 return self._closing 

787 

788 def close(self): 

789 if self._pipe is not None and not self._closing: 

790 # write_eof is all what we needed to close the write pipe 

791 self.write_eof() 

792 

793 def __del__(self, _warn=warnings.warn): 

794 if self._pipe is not None: 794 ↛ 795line 794 didn't jump to line 795 because the condition on line 794 was never true

795 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 

796 self._pipe.close() 

797 

798 def abort(self): 

799 self._close(None) 

800 

801 def _fatal_error(self, exc, message='Fatal error on pipe transport'): 

802 # should be called by exception handler only 

803 if isinstance(exc, OSError): 803 ↛ 807line 803 didn't jump to line 807 because the condition on line 803 was always true

804 if self._loop.get_debug(): 804 ↛ 805line 804 didn't jump to line 805 because the condition on line 804 was never true

805 logger.debug("%r: %s", self, message, exc_info=True) 

806 else: 

807 self._loop.call_exception_handler({ 

808 'message': message, 

809 'exception': exc, 

810 'transport': self, 

811 'protocol': self._protocol, 

812 }) 

813 self._close(exc) 

814 

815 def _close(self, exc=None): 

816 self._closing = True 

817 if self._buffer: 

818 self._loop._remove_writer(self._fileno) 

819 self._buffer.clear() 

820 self._loop._remove_reader(self._fileno) 

821 self._loop.call_soon(self._call_connection_lost, exc) 

822 

823 def _call_connection_lost(self, exc): 

824 try: 

825 self._protocol.connection_lost(exc) 

826 finally: 

827 self._pipe.close() 

828 self._pipe = None 

829 self._protocol = None 

830 self._loop = None 

831 

832 

833class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport): 

834 

835 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 

836 stdin_w = None 

837 if stdin == subprocess.PIPE and sys.platform.startswith('aix'): 837 ↛ 842line 837 didn't jump to line 842 because the condition on line 837 was never true

838 # Use a socket pair for stdin on AIX, since it does not 

839 # support selecting read events on the write end of a 

840 # socket (which we use in order to detect closing of the 

841 # other end). 

842 stdin, stdin_w = socket.socketpair() 

843 try: 

844 self._proc = subprocess.Popen( 

845 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 

846 universal_newlines=False, bufsize=bufsize, **kwargs) 

847 if stdin_w is not None: 847 ↛ 848line 847 didn't jump to line 848 because the condition on line 847 was never true

848 stdin.close() 

849 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize) 

850 stdin_w = None 

851 finally: 

852 if stdin_w is not None: 852 ↛ 853line 852 didn't jump to line 853 because the condition on line 852 was never true

853 stdin.close() 

854 stdin_w.close() 

855 

856 

857class _PidfdChildWatcher: 

858 """Child watcher implementation using Linux's pid file descriptors. 

859 

860 This child watcher polls process file descriptors (pidfds) to await child 

861 process termination. In some respects, PidfdChildWatcher is a "Goldilocks" 

862 child watcher implementation. It doesn't require signals or threads, doesn't 

863 interfere with any processes launched outside the event loop, and scales 

864 linearly with the number of subprocesses launched by the event loop. The 

865 main disadvantage is that pidfds are specific to Linux, and only work on 

866 recent (5.3+) kernels. 

867 """ 

868 

869 def add_child_handler(self, pid, callback, *args): 

870 loop = events.get_running_loop() 

871 pidfd = os.pidfd_open(pid) 

872 loop._add_reader(pidfd, self._do_wait, pid, pidfd, callback, args) 

873 

874 def _do_wait(self, pid, pidfd, callback, args): 

875 loop = events.get_running_loop() 

876 loop._remove_reader(pidfd) 

877 try: 

878 _, status = os.waitpid(pid, 0) 

879 except ChildProcessError: 

880 # The child process is already reaped 

881 # (may happen if waitpid() is called elsewhere). 

882 returncode = 255 

883 logger.warning( 

884 "child process pid %d exit status already read: " 

885 " will report returncode 255", 

886 pid) 

887 else: 

888 returncode = waitstatus_to_exitcode(status) 

889 

890 os.close(pidfd) 

891 callback(pid, returncode, *args) 

892 

893class _ThreadedChildWatcher: 

894 """Threaded child watcher implementation. 

895 

896 The watcher uses a thread per process 

897 for waiting for the process finish. 

898 

899 It doesn't require subscription on POSIX signal 

900 but a thread creation is not free. 

901 

902 The watcher has O(1) complexity, its performance doesn't depend 

903 on amount of spawn processes. 

904 """ 

905 

906 def __init__(self): 

907 self._pid_counter = itertools.count(0) 

908 self._threads = {} 

909 

910 def __del__(self, _warn=warnings.warn): 

911 threads = [thread for thread in list(self._threads.values()) 

912 if thread.is_alive()] 

913 if threads: 913 ↛ 914line 913 didn't jump to line 914 because the condition on line 913 was never true

914 _warn(f"{self.__class__} has registered but not finished child processes", 

915 ResourceWarning, 

916 source=self) 

917 

918 def add_child_handler(self, pid, callback, *args): 

919 loop = events.get_running_loop() 

920 thread = threading.Thread(target=self._do_waitpid, 

921 name=f"asyncio-waitpid-{next(self._pid_counter)}", 

922 args=(loop, pid, callback, args), 

923 daemon=True) 

924 self._threads[pid] = thread 

925 thread.start() 

926 

927 def _do_waitpid(self, loop, expected_pid, callback, args): 

928 assert expected_pid > 0 

929 

930 try: 

931 pid, status = os.waitpid(expected_pid, 0) 

932 except ChildProcessError: 

933 # The child process is already reaped 

934 # (may happen if waitpid() is called elsewhere). 

935 pid = expected_pid 

936 returncode = 255 

937 logger.warning( 

938 "Unknown child process pid %d, will report returncode 255", 

939 pid) 

940 else: 

941 returncode = waitstatus_to_exitcode(status) 

942 if loop.get_debug(): 942 ↛ 943line 942 didn't jump to line 943 because the condition on line 942 was never true

943 logger.debug('process %s exited with returncode %s', 

944 expected_pid, returncode) 

945 

946 if loop.is_closed(): 946 ↛ 947line 946 didn't jump to line 947 because the condition on line 946 was never true

947 logger.warning("Loop %r that handles pid %r is closed", loop, pid) 

948 else: 

949 loop.call_soon_threadsafe(callback, pid, returncode, *args) 

950 

951 self._threads.pop(expected_pid) 

952 

953def can_use_pidfd(): 

954 if not hasattr(os, 'pidfd_open'): 954 ↛ 955line 954 didn't jump to line 955 because the condition on line 954 was never true

955 return False 

956 try: 

957 pid = os.getpid() 

958 os.close(os.pidfd_open(pid, 0)) 

959 except OSError: 

960 # blocked by security policy like SECCOMP 

961 return False 

962 return True 

963 

964 

965class _UnixDefaultEventLoopPolicy(events._BaseDefaultEventLoopPolicy): 

966 """UNIX event loop policy""" 

967 _loop_factory = _UnixSelectorEventLoop 

968 

969 

970SelectorEventLoop = _UnixSelectorEventLoop 

971_DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy 

972EventLoop = SelectorEventLoop