Coverage for Lib/asyncio/unix_events.py: 87%

619 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-15 02:02 +0000

1"""Selector event loop for Unix with signal handling.""" 

2 

3import errno 

4import io 

5import itertools 

6import os 

7import selectors 

8import signal 

9import socket 

10import stat 

11import subprocess 

12import sys 

13import threading 

14import warnings 

15 

16from . import base_events 

17from . import base_subprocess 

18from . import constants 

19from . import coroutines 

20from . import events 

21from . import exceptions 

22from . import futures 

23from . import selector_events 

24from . import tasks 

25from . import transports 

26from .log import logger 

27 

28 

29__all__ = ( 

30 'SelectorEventLoop', 

31 '_DefaultEventLoopPolicy', 

32 'EventLoop', 

33) 

34 

35 

36if sys.platform == 'win32': # pragma: no cover 

37 raise ImportError('Signals are not really supported on Windows') 

38 

39 

40def _sighandler_noop(signum, frame): 

41 """Dummy signal handler.""" 

42 pass 

43 

44 

45def waitstatus_to_exitcode(status): 

46 try: 

47 return os.waitstatus_to_exitcode(status) 

48 except ValueError: 

49 # The child exited, but we don't understand its status. 

50 # This shouldn't happen, but if it does, let's just 

51 # return that status; perhaps that helps debug it. 

52 return status 

53 

54 

55class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): 

56 """Unix event loop. 

57 

58 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop. 

59 """ 

60 

61 def __init__(self, selector=None): 

62 super().__init__(selector) 

63 self._signal_handlers = {} 

64 self._unix_server_sockets = {} 

65 if can_use_pidfd(): 

66 self._watcher = _PidfdChildWatcher() 

67 else: 

68 self._watcher = _ThreadedChildWatcher() 

69 

70 def close(self): 

71 super().close() 

72 if not sys.is_finalizing(): 

73 for sig in list(self._signal_handlers): 

74 self.remove_signal_handler(sig) 

75 else: 

76 if self._signal_handlers: 76 ↛ exitline 76 didn't return from function 'close' because the condition on line 76 was always true

77 warnings.warn(f"Closing the loop {self!r} " 

78 f"on interpreter shutdown " 

79 f"stage, skipping signal handlers removal", 

80 ResourceWarning, 

81 source=self) 

82 self._signal_handlers.clear() 

83 

84 def _process_self_data(self, data): 

85 for signum in data: 

86 if not signum: 

87 # ignore null bytes written by _write_to_self() 

88 continue 

89 self._handle_signal(signum) 

90 

91 def add_signal_handler(self, sig, callback, *args): 

92 """Add a handler for a signal. UNIX only. 

93 

94 Raise ValueError if the signal number is invalid or uncatchable. 

95 Raise RuntimeError if there is a problem setting up the handler. 

96 """ 

97 if (coroutines.iscoroutine(callback) or 

98 coroutines._iscoroutinefunction(callback)): 

99 raise TypeError("coroutines cannot be used " 

100 "with add_signal_handler()") 

101 self._check_signal(sig) 

102 self._check_closed() 

103 try: 

104 # set_wakeup_fd() raises ValueError if this is not the 

105 # main thread. By calling it early we ensure that an 

106 # event loop running in another thread cannot add a signal 

107 # handler. 

108 signal.set_wakeup_fd(self._csock.fileno()) 

109 except (ValueError, OSError) as exc: 

110 raise RuntimeError(str(exc)) 

111 

112 handle = events.Handle(callback, args, self, None) 

113 self._signal_handlers[sig] = handle 

114 

115 try: 

116 # Register a dummy signal handler to ask Python to write the signal 

117 # number in the wakeup file descriptor. _process_self_data() will 

118 # read signal numbers from this file descriptor to handle signals. 

119 signal.signal(sig, _sighandler_noop) 

120 

121 # Set SA_RESTART to limit EINTR occurrences. 

122 signal.siginterrupt(sig, False) 

123 except OSError as exc: 

124 del self._signal_handlers[sig] 

125 if not self._signal_handlers: 

126 try: 

127 signal.set_wakeup_fd(-1) 

128 except (ValueError, OSError) as nexc: 

129 logger.info('set_wakeup_fd(-1) failed: %s', nexc) 

130 

131 if exc.errno == errno.EINVAL: 

132 raise RuntimeError(f'sig {sig} cannot be caught') 

133 else: 

134 raise 

135 

136 def _handle_signal(self, sig): 

137 """Internal helper that is the actual signal handler.""" 

138 handle = self._signal_handlers.get(sig) 

139 if handle is None: 

140 return # Assume it's some race condition. 

141 if handle._cancelled: 

142 self.remove_signal_handler(sig) # Remove it properly. 

143 else: 

144 self._add_callback_signalsafe(handle) 

145 

146 def remove_signal_handler(self, sig): 

147 """Remove a handler for a signal. UNIX only. 

148 

149 Return True if a signal handler was removed, False if not. 

150 """ 

151 self._check_signal(sig) 

152 try: 

153 del self._signal_handlers[sig] 

154 except KeyError: 

155 return False 

156 

157 if sig == signal.SIGINT: 

158 handler = signal.default_int_handler 

159 else: 

160 handler = signal.SIG_DFL 

161 

162 try: 

163 signal.signal(sig, handler) 

164 except OSError as exc: 

165 if exc.errno == errno.EINVAL: 

166 raise RuntimeError(f'sig {sig} cannot be caught') 

167 else: 

168 raise 

169 

170 if not self._signal_handlers: 

171 try: 

172 signal.set_wakeup_fd(-1) 

173 except (ValueError, OSError) as exc: 

174 logger.info('set_wakeup_fd(-1) failed: %s', exc) 

175 

176 return True 

177 

178 def _check_signal(self, sig): 

179 """Internal helper to validate a signal. 

180 

181 Raise ValueError if the signal number is invalid or uncatchable. 

182 Raise RuntimeError if there is a problem setting up the handler. 

183 """ 

184 if not isinstance(sig, int): 

185 raise TypeError(f'sig must be an int, not {sig!r}') 

186 

187 if sig not in signal.valid_signals(): 

188 raise ValueError(f'invalid signal number {sig}') 

189 

190 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 

191 extra=None): 

192 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra) 

193 

194 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 

195 extra=None): 

196 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra) 

197 

198 async def _make_subprocess_transport(self, protocol, args, shell, 

199 stdin, stdout, stderr, bufsize, 

200 extra=None, **kwargs): 

201 watcher = self._watcher 

202 waiter = self.create_future() 

203 transp = _UnixSubprocessTransport(self, protocol, args, shell, 

204 stdin, stdout, stderr, bufsize, 

205 waiter=waiter, extra=extra, 

206 **kwargs) 

207 watcher.add_child_handler(transp.get_pid(), 

208 self._child_watcher_callback, transp) 

209 try: 

210 await waiter 

211 except (SystemExit, KeyboardInterrupt): 

212 raise 

213 except BaseException: 

214 transp.close() 

215 await transp._wait() 

216 raise 

217 

218 return transp 

219 

220 def _child_watcher_callback(self, pid, returncode, transp): 

221 self.call_soon_threadsafe(transp._process_exited, returncode) 

222 

223 async def create_unix_connection( 

224 self, protocol_factory, path=None, *, 

225 ssl=None, sock=None, 

226 server_hostname=None, 

227 ssl_handshake_timeout=None, 

228 ssl_shutdown_timeout=None): 

229 assert server_hostname is None or isinstance(server_hostname, str) 

230 if ssl: 

231 if server_hostname is None: 

232 raise ValueError( 

233 'you have to pass server_hostname when using ssl') 

234 else: 

235 if server_hostname is not None: 

236 raise ValueError('server_hostname is only meaningful with ssl') 

237 if ssl_handshake_timeout is not None: 

238 raise ValueError( 

239 'ssl_handshake_timeout is only meaningful with ssl') 

240 if ssl_shutdown_timeout is not None: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true

241 raise ValueError( 

242 'ssl_shutdown_timeout is only meaningful with ssl') 

243 

244 if path is not None: 

245 if sock is not None: 

246 raise ValueError( 

247 'path and sock can not be specified at the same time') 

248 

249 path = os.fspath(path) 

250 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) 

251 try: 

252 sock.setblocking(False) 

253 await self.sock_connect(sock, path) 

254 except: 

255 sock.close() 

256 raise 

257 

258 else: 

259 if sock is None: 

260 raise ValueError('no path and sock were specified') 

261 if (sock.family != socket.AF_UNIX or 261 ↛ 265line 261 didn't jump to line 265 because the condition on line 261 was always true

262 sock.type != socket.SOCK_STREAM): 

263 raise ValueError( 

264 f'A UNIX Domain Stream Socket was expected, got {sock!r}') 

265 sock.setblocking(False) 

266 

267 transport, protocol = await self._create_connection_transport( 

268 sock, protocol_factory, ssl, server_hostname, 

269 ssl_handshake_timeout=ssl_handshake_timeout, 

270 ssl_shutdown_timeout=ssl_shutdown_timeout) 

271 return transport, protocol 

272 

273 async def create_unix_server( 

274 self, protocol_factory, path=None, *, 

275 sock=None, backlog=100, ssl=None, 

276 ssl_handshake_timeout=None, 

277 ssl_shutdown_timeout=None, 

278 start_serving=True, cleanup_socket=True): 

279 if isinstance(ssl, bool): 

280 raise TypeError('ssl argument must be an SSLContext or None') 

281 

282 if ssl_handshake_timeout is not None and not ssl: 

283 raise ValueError( 

284 'ssl_handshake_timeout is only meaningful with ssl') 

285 

286 if ssl_shutdown_timeout is not None and not ssl: 286 ↛ 287line 286 didn't jump to line 287 because the condition on line 286 was never true

287 raise ValueError( 

288 'ssl_shutdown_timeout is only meaningful with ssl') 

289 

290 if path is not None: 

291 if sock is not None: 

292 raise ValueError( 

293 'path and sock can not be specified at the same time') 

294 

295 path = os.fspath(path) 

296 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 

297 

298 # Check for abstract socket. `str` and `bytes` paths are supported. 

299 if path[0] not in (0, '\x00'): 299 ↛ 310line 299 didn't jump to line 310 because the condition on line 299 was always true

300 try: 

301 if stat.S_ISSOCK(os.stat(path).st_mode): 

302 os.remove(path) 

303 except FileNotFoundError: 

304 pass 

305 except OSError as err: 

306 # Directory may have permissions only to create socket. 

307 logger.error('Unable to check or remove stale UNIX socket ' 

308 '%r: %r', path, err) 

309 

310 try: 

311 sock.bind(path) 

312 except OSError as exc: 

313 sock.close() 

314 if exc.errno == errno.EADDRINUSE: 

315 # Let's improve the error message by adding 

316 # with what exact address it occurs. 

317 msg = f'Address {path!r} is already in use' 

318 raise OSError(errno.EADDRINUSE, msg) from None 

319 else: 

320 raise 

321 except: 

322 sock.close() 

323 raise 

324 else: 

325 if sock is None: 

326 raise ValueError( 

327 'path was not specified, and no sock specified') 

328 

329 if (sock.family != socket.AF_UNIX or 

330 sock.type != socket.SOCK_STREAM): 

331 raise ValueError( 

332 f'A UNIX Domain Stream Socket was expected, got {sock!r}') 

333 

334 if cleanup_socket: 

335 path = sock.getsockname() 

336 # Check for abstract socket. `str` and `bytes` paths are supported. 

337 if path[0] not in (0, '\x00'): 337 ↛ 343line 337 didn't jump to line 343 because the condition on line 337 was always true

338 try: 

339 self._unix_server_sockets[sock] = os.stat(path).st_ino 

340 except FileNotFoundError: 

341 pass 

342 

343 sock.setblocking(False) 

344 server = base_events.Server(self, [sock], protocol_factory, 

345 ssl, backlog, ssl_handshake_timeout, 

346 ssl_shutdown_timeout) 

347 if start_serving: 

348 server._start_serving() 

349 # Skip one loop iteration so that all 'loop.add_reader' 

350 # go through. 

351 await tasks.sleep(0) 

352 

353 return server 

354 

355 async def _sock_sendfile_native(self, sock, file, offset, count): 

356 try: 

357 os.sendfile 

358 except AttributeError: 

359 raise exceptions.SendfileNotAvailableError( 

360 "os.sendfile() is not available") 

361 try: 

362 fileno = file.fileno() 

363 except (AttributeError, io.UnsupportedOperation) as err: 

364 raise exceptions.SendfileNotAvailableError("not a regular file") 

365 try: 

366 fsize = os.fstat(fileno).st_size 

367 except OSError: 

368 raise exceptions.SendfileNotAvailableError("not a regular file") 

369 blocksize = count if count else fsize 

370 if not blocksize: 

371 return 0 # empty file 

372 

373 fut = self.create_future() 

374 self._sock_sendfile_native_impl(fut, None, sock, fileno, 

375 offset, count, blocksize, 0) 

376 return await fut 

377 

378 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno, 

379 offset, count, blocksize, total_sent): 

380 fd = sock.fileno() 

381 if registered_fd is not None: 

382 # Remove the callback early. It should be rare that the 

383 # selector says the fd is ready but the call still returns 

384 # EAGAIN, and I am willing to take a hit in that case in 

385 # order to simplify the common case. 

386 self.remove_writer(registered_fd) 

387 if fut.cancelled(): 

388 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 

389 return 

390 if count: 

391 blocksize = count - total_sent 

392 if blocksize <= 0: 

393 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 

394 fut.set_result(total_sent) 

395 return 

396 

397 # On 32-bit architectures truncate to 1GiB to avoid OverflowError 

398 blocksize = min(blocksize, sys.maxsize//2 + 1) 

399 

400 try: 

401 sent = os.sendfile(fd, fileno, offset, blocksize) 

402 except (BlockingIOError, InterruptedError): 

403 if registered_fd is None: 403 ↛ 405line 403 didn't jump to line 405 because the condition on line 403 was always true

404 self._sock_add_cancellation_callback(fut, sock) 

405 self.add_writer(fd, self._sock_sendfile_native_impl, fut, 

406 fd, sock, fileno, 

407 offset, count, blocksize, total_sent) 

408 except OSError as exc: 

409 if (registered_fd is not None and 409 ↛ 416line 409 didn't jump to line 416 because the condition on line 409 was never true

410 exc.errno == errno.ENOTCONN and 

411 type(exc) is not ConnectionError): 

412 # If we have an ENOTCONN and this isn't a first call to 

413 # sendfile(), i.e. the connection was closed in the middle 

414 # of the operation, normalize the error to ConnectionError 

415 # to make it consistent across all Posix systems. 

416 new_exc = ConnectionError( 

417 "socket is not connected", errno.ENOTCONN) 

418 new_exc.__cause__ = exc 

419 exc = new_exc 

420 if total_sent == 0: 

421 # We can get here for different reasons, the main 

422 # one being 'file' is not a regular mmap(2)-like 

423 # file, in which case we'll fall back on using 

424 # plain send(). 

425 err = exceptions.SendfileNotAvailableError( 

426 "os.sendfile call failed") 

427 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 

428 fut.set_exception(err) 

429 else: 

430 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 

431 fut.set_exception(exc) 

432 except (SystemExit, KeyboardInterrupt): 

433 raise 

434 except BaseException as exc: 

435 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 

436 fut.set_exception(exc) 

437 else: 

438 if sent == 0: 

439 # EOF 

440 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 

441 fut.set_result(total_sent) 

442 else: 

443 offset += sent 

444 total_sent += sent 

445 if registered_fd is None: 

446 self._sock_add_cancellation_callback(fut, sock) 

447 self.add_writer(fd, self._sock_sendfile_native_impl, fut, 

448 fd, sock, fileno, 

449 offset, count, blocksize, total_sent) 

450 

451 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent): 

452 if total_sent > 0: 

453 os.lseek(fileno, offset, os.SEEK_SET) 

454 

455 def _sock_add_cancellation_callback(self, fut, sock): 

456 def cb(fut): 

457 if fut.cancelled(): 

458 fd = sock.fileno() 

459 if fd != -1: 459 ↛ exitline 459 didn't return from function 'cb' because the condition on line 459 was always true

460 self.remove_writer(fd) 

461 fut.add_done_callback(cb) 

462 

463 def _stop_serving(self, sock): 

464 # Is this a unix socket that needs cleanup? 

465 if sock in self._unix_server_sockets: 

466 path = sock.getsockname() 

467 else: 

468 path = None 

469 

470 super()._stop_serving(sock) 

471 

472 if path is not None: 

473 prev_ino = self._unix_server_sockets[sock] 

474 del self._unix_server_sockets[sock] 

475 try: 

476 if os.stat(path).st_ino == prev_ino: 

477 os.unlink(path) 

478 except FileNotFoundError: 

479 pass 

480 except OSError as err: 

481 logger.error('Unable to clean up listening UNIX socket ' 

482 '%r: %r', path, err) 

483 

484 

485class _UnixReadPipeTransport(transports.ReadTransport): 

486 

487 max_size = 256 * 1024 # max bytes we read in one event loop iteration 

488 

489 def __init__(self, loop, pipe, protocol, waiter=None, extra=None): 

490 super().__init__(extra) 

491 self._extra['pipe'] = pipe 

492 self._loop = loop 

493 self._pipe = pipe 

494 self._fileno = pipe.fileno() 

495 self._protocol = protocol 

496 self._closing = False 

497 self._paused = False 

498 

499 mode = os.fstat(self._fileno).st_mode 

500 if not (stat.S_ISFIFO(mode) or 500 ↛ 503line 500 didn't jump to line 503 because the condition on line 500 was never true

501 stat.S_ISSOCK(mode) or 

502 stat.S_ISCHR(mode)): 

503 self._pipe = None 

504 self._fileno = None 

505 self._protocol = None 

506 raise ValueError("Pipe transport is for pipes/sockets only.") 

507 

508 os.set_blocking(self._fileno, False) 

509 

510 self._loop.call_soon(self._protocol.connection_made, self) 

511 # only start reading when connection_made() has been called 

512 self._loop.call_soon(self._add_reader, 

513 self._fileno, self._read_ready) 

514 if waiter is not None: 

515 # only wake up the waiter when connection_made() has been called 

516 self._loop.call_soon(futures._set_result_unless_cancelled, 

517 waiter, None) 

518 

519 def _add_reader(self, fd, callback): 

520 if not self.is_reading(): 

521 return 

522 self._loop._add_reader(fd, callback) 

523 

524 def is_reading(self): 

525 return not self._paused and not self._closing 

526 

527 def __repr__(self): 

528 info = [self.__class__.__name__] 

529 if self._pipe is None: 529 ↛ 530line 529 didn't jump to line 530 because the condition on line 529 was never true

530 info.append('closed') 

531 elif self._closing: 531 ↛ 532line 531 didn't jump to line 532 because the condition on line 531 was never true

532 info.append('closing') 

533 info.append(f'fd={self._fileno}') 

534 selector = getattr(self._loop, '_selector', None) 

535 if self._pipe is not None and selector is not None: 

536 polling = selector_events._test_selector_event( 

537 selector, self._fileno, selectors.EVENT_READ) 

538 if polling: 538 ↛ 539line 538 didn't jump to line 539 because the condition on line 538 was never true

539 info.append('polling') 

540 else: 

541 info.append('idle') 

542 elif self._pipe is not None: 542 ↛ 545line 542 didn't jump to line 545 because the condition on line 542 was always true

543 info.append('open') 

544 else: 

545 info.append('closed') 

546 return '<{}>'.format(' '.join(info)) 

547 

548 def _read_ready(self): 

549 try: 

550 data = os.read(self._fileno, self.max_size) 

551 except (BlockingIOError, InterruptedError): 

552 pass 

553 except OSError as exc: 

554 self._fatal_error(exc, 'Fatal read error on pipe transport') 

555 else: 

556 if data: 

557 self._protocol.data_received(data) 

558 else: 

559 if self._loop.get_debug(): 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true

560 logger.info("%r was closed by peer", self) 

561 self._closing = True 

562 self._loop._remove_reader(self._fileno) 

563 self._loop.call_soon(self._protocol.eof_received) 

564 self._loop.call_soon(self._call_connection_lost, None) 

565 

566 def pause_reading(self): 

567 if not self.is_reading(): 

568 return 

569 self._paused = True 

570 self._loop._remove_reader(self._fileno) 

571 if self._loop.get_debug(): 571 ↛ 572line 571 didn't jump to line 572 because the condition on line 571 was never true

572 logger.debug("%r pauses reading", self) 

573 

574 def resume_reading(self): 

575 if self._closing or not self._paused: 

576 return 

577 self._paused = False 

578 self._loop._add_reader(self._fileno, self._read_ready) 

579 if self._loop.get_debug(): 579 ↛ 580line 579 didn't jump to line 580 because the condition on line 579 was never true

580 logger.debug("%r resumes reading", self) 

581 

582 def set_protocol(self, protocol): 

583 self._protocol = protocol 

584 

585 def get_protocol(self): 

586 return self._protocol 

587 

588 def is_closing(self): 

589 return self._closing 

590 

591 def close(self): 

592 if not self._closing: 

593 self._close(None) 

594 

595 def __del__(self, _warn=warnings.warn): 

596 if self._pipe is not None: 596 ↛ 597line 596 didn't jump to line 597 because the condition on line 596 was never true

597 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 

598 self._pipe.close() 

599 

600 def _fatal_error(self, exc, message='Fatal error on pipe transport'): 

601 # should be called by exception handler only 

602 if (isinstance(exc, OSError) and exc.errno == errno.EIO): 602 ↛ 603line 602 didn't jump to line 603 because the condition on line 602 was never true

603 if self._loop.get_debug(): 

604 logger.debug("%r: %s", self, message, exc_info=True) 

605 else: 

606 self._loop.call_exception_handler({ 

607 'message': message, 

608 'exception': exc, 

609 'transport': self, 

610 'protocol': self._protocol, 

611 }) 

612 self._close(exc) 

613 

614 def _close(self, exc): 

615 self._closing = True 

616 self._loop._remove_reader(self._fileno) 

617 self._loop.call_soon(self._call_connection_lost, exc) 

618 

619 def _call_connection_lost(self, exc): 

620 try: 

621 self._protocol.connection_lost(exc) 

622 finally: 

623 self._pipe.close() 

624 self._pipe = None 

625 self._protocol = None 

626 self._loop = None 

627 

628 

629class _UnixWritePipeTransport(transports._FlowControlMixin, 

630 transports.WriteTransport): 

631 

632 def __init__(self, loop, pipe, protocol, waiter=None, extra=None): 

633 super().__init__(extra, loop) 

634 self._extra['pipe'] = pipe 

635 self._pipe = pipe 

636 self._fileno = pipe.fileno() 

637 self._protocol = protocol 

638 self._buffer = bytearray() 

639 self._conn_lost = 0 

640 self._closing = False # Set when close() or write_eof() called. 

641 

642 mode = os.fstat(self._fileno).st_mode 

643 is_char = stat.S_ISCHR(mode) 

644 is_fifo = stat.S_ISFIFO(mode) 

645 is_socket = stat.S_ISSOCK(mode) 

646 if not (is_char or is_fifo or is_socket): 646 ↛ 647line 646 didn't jump to line 647 because the condition on line 646 was never true

647 self._pipe = None 

648 self._fileno = None 

649 self._protocol = None 

650 raise ValueError("Pipe transport is only for " 

651 "pipes, sockets and character devices") 

652 

653 os.set_blocking(self._fileno, False) 

654 self._loop.call_soon(self._protocol.connection_made, self) 

655 

656 # On AIX, the reader trick (to be notified when the read end of the 

657 # socket is closed) only works for sockets. On other platforms it 

658 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) 

659 if is_socket or (is_fifo and not sys.platform.startswith("aix")): 

660 # only start reading when connection_made() has been called 

661 self._loop.call_soon(self._loop._add_reader, 

662 self._fileno, self._read_ready) 

663 

664 if waiter is not None: 

665 # only wake up the waiter when connection_made() has been called 

666 self._loop.call_soon(futures._set_result_unless_cancelled, 

667 waiter, None) 

668 

669 def __repr__(self): 

670 info = [self.__class__.__name__] 

671 if self._pipe is None: 671 ↛ 672line 671 didn't jump to line 672 because the condition on line 671 was never true

672 info.append('closed') 

673 elif self._closing: 673 ↛ 674line 673 didn't jump to line 674 because the condition on line 673 was never true

674 info.append('closing') 

675 info.append(f'fd={self._fileno}') 

676 selector = getattr(self._loop, '_selector', None) 

677 if self._pipe is not None and selector is not None: 677 ↛ 678line 677 didn't jump to line 678 because the condition on line 677 was never true

678 polling = selector_events._test_selector_event( 

679 selector, self._fileno, selectors.EVENT_WRITE) 

680 if polling: 

681 info.append('polling') 

682 else: 

683 info.append('idle') 

684 

685 bufsize = self.get_write_buffer_size() 

686 info.append(f'bufsize={bufsize}') 

687 elif self._pipe is not None: 687 ↛ 690line 687 didn't jump to line 690 because the condition on line 687 was always true

688 info.append('open') 

689 else: 

690 info.append('closed') 

691 return '<{}>'.format(' '.join(info)) 

692 

693 def get_write_buffer_size(self): 

694 return len(self._buffer) 

695 

696 def _read_ready(self): 

697 # Pipe was closed by peer. 

698 if self._loop.get_debug(): 698 ↛ 699line 698 didn't jump to line 699 because the condition on line 698 was never true

699 logger.info("%r was closed by peer", self) 

700 if self._buffer: 

701 self._close(BrokenPipeError()) 

702 else: 

703 self._close() 

704 

705 def write(self, data): 

706 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data) 

707 if isinstance(data, bytearray): 707 ↛ 708line 707 didn't jump to line 708 because the condition on line 707 was never true

708 data = memoryview(data) 

709 if not data: 

710 return 

711 

712 if self._conn_lost or self._closing: 

713 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 

714 logger.warning('pipe closed by peer or ' 

715 'os.write(pipe, data) raised exception.') 

716 self._conn_lost += 1 

717 return 

718 

719 if not self._buffer: 

720 # Attempt to send it right away first. 

721 try: 

722 n = os.write(self._fileno, data) 

723 except (BlockingIOError, InterruptedError): 

724 n = 0 

725 except (SystemExit, KeyboardInterrupt): 

726 raise 

727 except BaseException as exc: 

728 self._conn_lost += 1 

729 self._fatal_error(exc, 'Fatal write error on pipe transport') 

730 return 

731 if n == len(data): 

732 return 

733 elif n > 0: 

734 data = memoryview(data)[n:] 

735 self._loop._add_writer(self._fileno, self._write_ready) 

736 

737 self._buffer += data 

738 self._maybe_pause_protocol() 

739 

740 def _write_ready(self): 

741 assert self._buffer, 'Data should not be empty' 

742 

743 try: 

744 n = os.write(self._fileno, self._buffer) 

745 except (BlockingIOError, InterruptedError): 

746 pass 

747 except (SystemExit, KeyboardInterrupt): 

748 raise 

749 except BaseException as exc: 

750 self._buffer.clear() 

751 self._conn_lost += 1 

752 # Remove writer here, _fatal_error() doesn't it 

753 # because _buffer is empty. 

754 self._loop._remove_writer(self._fileno) 

755 self._fatal_error(exc, 'Fatal write error on pipe transport') 

756 else: 

757 if n == len(self._buffer): 

758 self._buffer.clear() 

759 self._loop._remove_writer(self._fileno) 

760 self._maybe_resume_protocol() # May append to buffer. 

761 if self._closing: 

762 self._loop._remove_reader(self._fileno) 

763 self._call_connection_lost(None) 

764 return 

765 elif n > 0: 

766 del self._buffer[:n] 

767 

768 def can_write_eof(self): 

769 return True 

770 

771 def write_eof(self): 

772 if self._closing: 772 ↛ 773line 772 didn't jump to line 773 because the condition on line 772 was never true

773 return 

774 assert self._pipe 

775 self._closing = True 

776 if not self._buffer: 

777 self._loop._remove_reader(self._fileno) 

778 self._loop.call_soon(self._call_connection_lost, None) 

779 

780 def set_protocol(self, protocol): 

781 self._protocol = protocol 

782 

783 def get_protocol(self): 

784 return self._protocol 

785 

786 def is_closing(self): 

787 return self._closing 

788 

789 def close(self): 

790 if self._pipe is not None and not self._closing: 

791 # write_eof is all what we needed to close the write pipe 

792 self.write_eof() 

793 

794 def __del__(self, _warn=warnings.warn): 

795 if self._pipe is not None: 795 ↛ 796line 795 didn't jump to line 796 because the condition on line 795 was never true

796 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 

797 self._pipe.close() 

798 

799 def abort(self): 

800 self._close(None) 

801 

802 def _fatal_error(self, exc, message='Fatal error on pipe transport'): 

803 # should be called by exception handler only 

804 if isinstance(exc, OSError): 804 ↛ 808line 804 didn't jump to line 808 because the condition on line 804 was always true

805 if self._loop.get_debug(): 805 ↛ 806line 805 didn't jump to line 806 because the condition on line 805 was never true

806 logger.debug("%r: %s", self, message, exc_info=True) 

807 else: 

808 self._loop.call_exception_handler({ 

809 'message': message, 

810 'exception': exc, 

811 'transport': self, 

812 'protocol': self._protocol, 

813 }) 

814 self._close(exc) 

815 

816 def _close(self, exc=None): 

817 self._closing = True 

818 if self._buffer: 

819 self._loop._remove_writer(self._fileno) 

820 self._buffer.clear() 

821 self._loop._remove_reader(self._fileno) 

822 self._loop.call_soon(self._call_connection_lost, exc) 

823 

824 def _call_connection_lost(self, exc): 

825 try: 

826 self._protocol.connection_lost(exc) 

827 finally: 

828 self._pipe.close() 

829 self._pipe = None 

830 self._protocol = None 

831 self._loop = None 

832 

833 

834class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport): 

835 

836 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 

837 stdin_w = None 

838 if stdin == subprocess.PIPE and sys.platform.startswith('aix'): 838 ↛ 843line 838 didn't jump to line 843 because the condition on line 838 was never true

839 # Use a socket pair for stdin on AIX, since it does not 

840 # support selecting read events on the write end of a 

841 # socket (which we use in order to detect closing of the 

842 # other end). 

843 stdin, stdin_w = socket.socketpair() 

844 try: 

845 self._proc = subprocess.Popen( 

846 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 

847 universal_newlines=False, bufsize=bufsize, **kwargs) 

848 if stdin_w is not None: 848 ↛ 849line 848 didn't jump to line 849 because the condition on line 848 was never true

849 stdin.close() 

850 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize) 

851 stdin_w = None 

852 finally: 

853 if stdin_w is not None: 853 ↛ 854line 853 didn't jump to line 854 because the condition on line 853 was never true

854 stdin.close() 

855 stdin_w.close() 

856 

857 

858class _PidfdChildWatcher: 

859 """Child watcher implementation using Linux's pid file descriptors. 

860 

861 This child watcher polls process file descriptors (pidfds) to await child 

862 process termination. In some respects, PidfdChildWatcher is a "Goldilocks" 

863 child watcher implementation. It doesn't require signals or threads, doesn't 

864 interfere with any processes launched outside the event loop, and scales 

865 linearly with the number of subprocesses launched by the event loop. The 

866 main disadvantage is that pidfds are specific to Linux, and only work on 

867 recent (5.3+) kernels. 

868 """ 

869 

870 def add_child_handler(self, pid, callback, *args): 

871 loop = events.get_running_loop() 

872 pidfd = os.pidfd_open(pid) 

873 loop._add_reader(pidfd, self._do_wait, pid, pidfd, callback, args) 

874 

875 def _do_wait(self, pid, pidfd, callback, args): 

876 loop = events.get_running_loop() 

877 loop._remove_reader(pidfd) 

878 try: 

879 _, status = os.waitpid(pid, 0) 

880 except ChildProcessError: 

881 # The child process is already reaped 

882 # (may happen if waitpid() is called elsewhere). 

883 returncode = 255 

884 logger.warning( 

885 "child process pid %d exit status already read: " 

886 " will report returncode 255", 

887 pid) 

888 else: 

889 returncode = waitstatus_to_exitcode(status) 

890 

891 os.close(pidfd) 

892 callback(pid, returncode, *args) 

893 

894class _ThreadedChildWatcher: 

895 """Threaded child watcher implementation. 

896 

897 The watcher uses a thread per process 

898 for waiting for the process finish. 

899 

900 It doesn't require subscription on POSIX signal 

901 but a thread creation is not free. 

902 

903 The watcher has O(1) complexity, its performance doesn't depend 

904 on amount of spawn processes. 

905 """ 

906 

907 def __init__(self): 

908 self._pid_counter = itertools.count(0) 

909 self._threads = {} 

910 

911 def __del__(self, _warn=warnings.warn): 

912 threads = [thread for thread in list(self._threads.values()) 

913 if thread.is_alive()] 

914 if threads: 914 ↛ 915line 914 didn't jump to line 915 because the condition on line 914 was never true

915 _warn(f"{self.__class__} has registered but not finished child processes", 

916 ResourceWarning, 

917 source=self) 

918 

919 def add_child_handler(self, pid, callback, *args): 

920 loop = events.get_running_loop() 

921 thread = threading.Thread(target=self._do_waitpid, 

922 name=f"asyncio-waitpid-{next(self._pid_counter)}", 

923 args=(loop, pid, callback, args), 

924 daemon=True) 

925 self._threads[pid] = thread 

926 thread.start() 

927 

928 def _do_waitpid(self, loop, expected_pid, callback, args): 

929 assert expected_pid > 0 

930 

931 try: 

932 pid, status = os.waitpid(expected_pid, 0) 

933 except ChildProcessError: 

934 # The child process is already reaped 

935 # (may happen if waitpid() is called elsewhere). 

936 pid = expected_pid 

937 returncode = 255 

938 logger.warning( 

939 "Unknown child process pid %d, will report returncode 255", 

940 pid) 

941 else: 

942 returncode = waitstatus_to_exitcode(status) 

943 if loop.get_debug(): 943 ↛ 944line 943 didn't jump to line 944 because the condition on line 943 was never true

944 logger.debug('process %s exited with returncode %s', 

945 expected_pid, returncode) 

946 

947 if loop.is_closed(): 947 ↛ 948line 947 didn't jump to line 948 because the condition on line 947 was never true

948 logger.warning("Loop %r that handles pid %r is closed", loop, pid) 

949 else: 

950 loop.call_soon_threadsafe(callback, pid, returncode, *args) 

951 

952 self._threads.pop(expected_pid) 

953 

954def can_use_pidfd(): 

955 if not hasattr(os, 'pidfd_open'): 955 ↛ 956line 955 didn't jump to line 956 because the condition on line 955 was never true

956 return False 

957 try: 

958 pid = os.getpid() 

959 os.close(os.pidfd_open(pid, 0)) 

960 except OSError: 

961 # blocked by security policy like SECCOMP 

962 return False 

963 return True 

964 

965 

966class _UnixDefaultEventLoopPolicy(events._BaseDefaultEventLoopPolicy): 

967 """UNIX event loop policy""" 

968 _loop_factory = _UnixSelectorEventLoop 

969 

970 

971SelectorEventLoop = _UnixSelectorEventLoop 

972_DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy 

973EventLoop = SelectorEventLoop