Coverage for Lib / asyncio / selector_events.py: 87%

922 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 02:39 +0000

1"""Event loop using a selector and related classes. 

2 

3A selector is a "notify-when-ready" multiplexer. For a subclass which 

4also includes support for signal handling, see the unix_events sub-module. 

5""" 

6 

7__all__ = 'BaseSelectorEventLoop', 

8 

9import collections 

10import errno 

11import functools 

12import itertools 

13import os 

14import selectors 

15import socket 

16import warnings 

17import weakref 

18try: 

19 import ssl 

20except ImportError: # pragma: no cover 

21 ssl = None 

22 

23from . import base_events 

24from . import constants 

25from . import events 

26from . import futures 

27from . import protocols 

28from . import sslproto 

29from . import transports 

30from . import trsock 

31from .log import logger 

32 

33_HAS_SENDMSG = hasattr(socket.socket, 'sendmsg') 

34 

35if _HAS_SENDMSG: 35 ↛ 42line 35 didn't jump to line 42 because the condition on line 35 was always true

36 try: 

37 SC_IOV_MAX = os.sysconf('SC_IOV_MAX') 

38 except OSError: 

39 # Fallback to send 

40 _HAS_SENDMSG = False 

41 

42def _test_selector_event(selector, fd, event): 

43 # Test if the selector is monitoring 'event' events 

44 # for the file descriptor 'fd'. 

45 try: 

46 key = selector.get_key(fd) 

47 except KeyError: 

48 return False 

49 else: 

50 return bool(key.events & event) 

51 

52 

53class BaseSelectorEventLoop(base_events.BaseEventLoop): 

54 """Selector event loop. 

55 

56 See events.EventLoop for API specification. 

57 """ 

58 

59 def __init__(self, selector=None): 

60 super().__init__() 

61 

62 if selector is None: 

63 selector = selectors.DefaultSelector() 

64 logger.debug('Using selector: %s', selector.__class__.__name__) 

65 self._selector = selector 

66 self._make_self_pipe() 

67 self._transports = weakref.WeakValueDictionary() 

68 

69 def _make_socket_transport(self, sock, protocol, waiter=None, *, 

70 extra=None, server=None, context=None): 

71 self._ensure_fd_no_transport(sock) 

72 return _SelectorSocketTransport(self, sock, protocol, waiter, 

73 extra, server, context=context) 

74 

75 def _make_ssl_transport( 

76 self, rawsock, protocol, sslcontext, waiter=None, 

77 *, server_side=False, server_hostname=None, 

78 extra=None, server=None, 

79 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 

80 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT, 

81 context=None, 

82 ): 

83 self._ensure_fd_no_transport(rawsock) 

84 ssl_protocol = sslproto.SSLProtocol( 

85 self, protocol, sslcontext, waiter, 

86 server_side, server_hostname, 

87 ssl_handshake_timeout=ssl_handshake_timeout, 

88 ssl_shutdown_timeout=ssl_shutdown_timeout, 

89 ) 

90 _SelectorSocketTransport(self, rawsock, ssl_protocol, 

91 extra=extra, server=server, context=context) 

92 return ssl_protocol._app_transport 

93 

94 def _make_datagram_transport(self, sock, protocol, 

95 address=None, waiter=None, extra=None): 

96 self._ensure_fd_no_transport(sock) 

97 return _SelectorDatagramTransport(self, sock, protocol, 

98 address, waiter, extra) 

99 

100 def close(self): 

101 if self.is_running(): 

102 raise RuntimeError("Cannot close a running event loop") 

103 if self.is_closed(): 

104 return 

105 self._close_self_pipe() 

106 super().close() 

107 if self._selector is not None: 

108 self._selector.close() 

109 self._selector = None 

110 

111 def _close_self_pipe(self): 

112 self._remove_reader(self._ssock.fileno()) 

113 self._ssock.close() 

114 self._ssock = None 

115 self._csock.close() 

116 self._csock = None 

117 self._internal_fds -= 1 

118 

119 def _make_self_pipe(self): 

120 # A self-socket, really. :-) 

121 self._ssock, self._csock = socket.socketpair() 

122 self._ssock.setblocking(False) 

123 self._csock.setblocking(False) 

124 self._internal_fds += 1 

125 self._add_reader(self._ssock.fileno(), self._read_from_self) 

126 

127 def _process_self_data(self, data): 

128 pass 

129 

130 def _read_from_self(self): 

131 while True: 

132 try: 

133 data = self._ssock.recv(4096) 

134 if not data: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 break 

136 self._process_self_data(data) 

137 except InterruptedError: 

138 continue 

139 except BlockingIOError: 

140 break 

141 

142 def _write_to_self(self): 

143 # This may be called from a different thread, possibly after 

144 # _close_self_pipe() has been called or even while it is 

145 # running. Guard for self._csock being None or closed. When 

146 # a socket is closed, send() raises OSError (with errno set to 

147 # EBADF, but let's not rely on the exact error code). 

148 csock = self._csock 

149 if csock is None: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true

150 return 

151 

152 try: 

153 csock.send(b'\0') 

154 except OSError: 

155 if self._debug: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 logger.debug("Fail to write a null byte into the " 

157 "self-pipe socket", 

158 exc_info=True) 

159 

160 def _start_serving(self, protocol_factory, sock, 

161 sslcontext=None, server=None, backlog=100, 

162 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 

163 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT, context=None): 

164 self._add_reader(sock.fileno(), self._accept_connection, 

165 protocol_factory, sock, sslcontext, server, backlog, 

166 ssl_handshake_timeout, ssl_shutdown_timeout, context) 

167 

168 def _accept_connection( 

169 self, protocol_factory, sock, 

170 sslcontext=None, server=None, backlog=100, 

171 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 

172 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT, context=None): 

173 # This method is only called once for each event loop tick where the 

174 # listening socket has triggered an EVENT_READ. There may be multiple 

175 # connections waiting for an .accept() so it is called in a loop. 

176 # See https://bugs.python.org/issue27906 for more details. 

177 for _ in range(backlog + 1): 

178 try: 

179 conn, addr = sock.accept() 

180 if self._debug: 

181 logger.debug("%r got a new connection from %r: %r", 

182 server, addr, conn) 

183 conn.setblocking(False) 

184 except ConnectionAbortedError: 

185 # Discard connections that were aborted before accept(). 

186 continue 

187 except (BlockingIOError, InterruptedError): 

188 # Early exit because of a signal or 

189 # the socket accept buffer is empty. 

190 return 

191 except OSError as exc: 

192 # There's nowhere to send the error, so just log it. 

193 if exc.errno in (errno.EMFILE, errno.ENFILE, 193 ↛ 210line 193 didn't jump to line 210 because the condition on line 193 was always true

194 errno.ENOBUFS, errno.ENOMEM): 

195 # Some platforms (e.g. Linux keep reporting the FD as 

196 # ready, so we remove the read handler temporarily. 

197 # We'll try again in a while. 

198 self.call_exception_handler({ 

199 'message': 'socket.accept() out of system resource', 

200 'exception': exc, 

201 'socket': trsock.TransportSocket(sock), 

202 }) 

203 self._remove_reader(sock.fileno()) 

204 self.call_later(constants.ACCEPT_RETRY_DELAY, 

205 self._start_serving, 

206 protocol_factory, sock, sslcontext, server, 

207 backlog, ssl_handshake_timeout, 

208 ssl_shutdown_timeout, context) 

209 else: 

210 raise # The event loop will catch, log and ignore it. 

211 else: 

212 extra = {'peername': addr} 

213 conn_context = context.copy() if context is not None else None 

214 accept = self._accept_connection2( 

215 protocol_factory, conn, extra, sslcontext, server, 

216 ssl_handshake_timeout, ssl_shutdown_timeout, context=conn_context) 

217 self.create_task(accept, context=conn_context) 

218 

219 async def _accept_connection2( 

220 self, protocol_factory, conn, extra, 

221 sslcontext=None, server=None, 

222 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 

223 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT, context=None): 

224 protocol = None 

225 transport = None 

226 try: 

227 protocol = protocol_factory() 

228 waiter = self.create_future() 

229 if sslcontext: 

230 transport = self._make_ssl_transport( 

231 conn, protocol, sslcontext, waiter=waiter, 

232 server_side=True, extra=extra, server=server, 

233 ssl_handshake_timeout=ssl_handshake_timeout, 

234 ssl_shutdown_timeout=ssl_shutdown_timeout, 

235 context=context) 

236 else: 

237 transport = self._make_socket_transport( 

238 conn, protocol, waiter=waiter, extra=extra, 

239 server=server, context=context) 

240 

241 try: 

242 await waiter 

243 except BaseException: 

244 transport.close() 

245 # gh-109534: When an exception is raised by the SSLProtocol object the 

246 # exception set in this future can keep the protocol object alive and 

247 # cause a reference cycle. 

248 waiter = None 

249 raise 

250 # It's now up to the protocol to handle the connection. 

251 

252 except (SystemExit, KeyboardInterrupt): 

253 raise 

254 except BaseException as exc: 

255 if self._debug: 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true

256 context = { 

257 'message': 

258 'Error on transport creation for incoming connection', 

259 'exception': exc, 

260 } 

261 if protocol is not None: 

262 context['protocol'] = protocol 

263 if transport is not None: 

264 context['transport'] = transport 

265 self.call_exception_handler(context) 

266 

267 def _ensure_fd_no_transport(self, fd): 

268 fileno = fd 

269 if not isinstance(fileno, int): 

270 try: 

271 fileno = int(fileno.fileno()) 

272 except (AttributeError, TypeError, ValueError): 

273 # This code matches selectors._fileobj_to_fd function. 

274 raise ValueError(f"Invalid file object: {fd!r}") from None 

275 transport = self._transports.get(fileno) 

276 if transport and not transport.is_closing(): 

277 raise RuntimeError( 

278 f'File descriptor {fd!r} is used by transport ' 

279 f'{transport!r}') 

280 

281 def _add_reader(self, fd, callback, *args, context=None): 

282 self._check_closed() 

283 handle = events.Handle(callback, args, self, context=context) 

284 key = self._selector.get_map().get(fd) 

285 if key is None: 

286 self._selector.register(fd, selectors.EVENT_READ, 

287 (handle, None)) 

288 else: 

289 mask, (reader, writer) = key.events, key.data 

290 self._selector.modify(fd, mask | selectors.EVENT_READ, 

291 (handle, writer)) 

292 if reader is not None: 

293 reader.cancel() 

294 return handle 

295 

296 def _remove_reader(self, fd): 

297 if self.is_closed(): 

298 return False 

299 key = self._selector.get_map().get(fd) 

300 if key is None: 

301 return False 

302 mask, (reader, writer) = key.events, key.data 

303 mask &= ~selectors.EVENT_READ 

304 if not mask: 

305 self._selector.unregister(fd) 

306 else: 

307 self._selector.modify(fd, mask, (None, writer)) 

308 

309 if reader is not None: 

310 reader.cancel() 

311 return True 

312 else: 

313 return False 

314 

315 def _add_writer(self, fd, callback, *args, context=None): 

316 self._check_closed() 

317 handle = events.Handle(callback, args, self, context=context) 

318 key = self._selector.get_map().get(fd) 

319 if key is None: 

320 self._selector.register(fd, selectors.EVENT_WRITE, 

321 (None, handle)) 

322 else: 

323 mask, (reader, writer) = key.events, key.data 

324 self._selector.modify(fd, mask | selectors.EVENT_WRITE, 

325 (reader, handle)) 

326 if writer is not None: 

327 writer.cancel() 

328 return handle 

329 

330 def _remove_writer(self, fd): 

331 """Remove a writer callback.""" 

332 if self.is_closed(): 

333 return False 

334 key = self._selector.get_map().get(fd) 

335 if key is None: 

336 return False 

337 mask, (reader, writer) = key.events, key.data 

338 # Remove both writer and connector. 

339 mask &= ~selectors.EVENT_WRITE 

340 if not mask: 

341 self._selector.unregister(fd) 

342 else: 

343 self._selector.modify(fd, mask, (reader, None)) 

344 

345 if writer is not None: 

346 writer.cancel() 

347 return True 

348 else: 

349 return False 

350 

351 def add_reader(self, fd, callback, *args): 

352 """Add a reader callback.""" 

353 self._ensure_fd_no_transport(fd) 

354 self._add_reader(fd, callback, *args) 

355 

356 def remove_reader(self, fd): 

357 """Remove a reader callback.""" 

358 self._ensure_fd_no_transport(fd) 

359 return self._remove_reader(fd) 

360 

361 def add_writer(self, fd, callback, *args): 

362 """Add a writer callback..""" 

363 self._ensure_fd_no_transport(fd) 

364 self._add_writer(fd, callback, *args) 

365 

366 def remove_writer(self, fd): 

367 """Remove a writer callback.""" 

368 self._ensure_fd_no_transport(fd) 

369 return self._remove_writer(fd) 

370 

371 async def sock_recv(self, sock, n): 

372 """Receive data from the socket. 

373 

374 The return value is a bytes object representing the data received. 

375 The maximum amount of data to be received at once is specified by 

376 nbytes. 

377 """ 

378 base_events._check_ssl_socket(sock) 

379 if self._debug and sock.gettimeout() != 0: 

380 raise ValueError("the socket must be non-blocking") 

381 try: 

382 return sock.recv(n) 

383 except (BlockingIOError, InterruptedError): 

384 pass 

385 fut = self.create_future() 

386 fd = sock.fileno() 

387 self._ensure_fd_no_transport(fd) 

388 handle = self._add_reader(fd, self._sock_recv, fut, sock, n) 

389 fut.add_done_callback( 

390 functools.partial(self._sock_read_done, fd, handle=handle)) 

391 return await fut 

392 

393 def _sock_read_done(self, fd, fut, handle=None): 

394 if handle is None or not handle.cancelled(): 

395 self.remove_reader(fd) 

396 

397 def _sock_recv(self, fut, sock, n): 

398 # _sock_recv() can add itself as an I/O callback if the operation can't 

399 # be done immediately. Don't use it directly, call sock_recv(). 

400 if fut.done(): 400 ↛ 401line 400 didn't jump to line 401 because the condition on line 400 was never true

401 return 

402 try: 

403 data = sock.recv(n) 

404 except (BlockingIOError, InterruptedError): 

405 return # try again next time 

406 except (SystemExit, KeyboardInterrupt): 

407 raise 

408 except BaseException as exc: 

409 fut.set_exception(exc) 

410 else: 

411 fut.set_result(data) 

412 

413 async def sock_recv_into(self, sock, buf): 

414 """Receive data from the socket. 

415 

416 The received data is written into *buf* (a writable buffer). 

417 The return value is the number of bytes written. 

418 """ 

419 base_events._check_ssl_socket(sock) 

420 if self._debug and sock.gettimeout() != 0: 

421 raise ValueError("the socket must be non-blocking") 

422 try: 

423 return sock.recv_into(buf) 

424 except (BlockingIOError, InterruptedError): 

425 pass 

426 fut = self.create_future() 

427 fd = sock.fileno() 

428 self._ensure_fd_no_transport(fd) 

429 handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf) 

430 fut.add_done_callback( 

431 functools.partial(self._sock_read_done, fd, handle=handle)) 

432 return await fut 

433 

434 def _sock_recv_into(self, fut, sock, buf): 

435 # _sock_recv_into() can add itself as an I/O callback if the operation 

436 # can't be done immediately. Don't use it directly, call 

437 # sock_recv_into(). 

438 if fut.done(): 438 ↛ 439line 438 didn't jump to line 439 because the condition on line 438 was never true

439 return 

440 try: 

441 nbytes = sock.recv_into(buf) 

442 except (BlockingIOError, InterruptedError): 

443 return # try again next time 

444 except (SystemExit, KeyboardInterrupt): 

445 raise 

446 except BaseException as exc: 

447 fut.set_exception(exc) 

448 else: 

449 fut.set_result(nbytes) 

450 

451 async def sock_recvfrom(self, sock, bufsize): 

452 """Receive a datagram from a datagram socket. 

453 

454 The return value is a tuple of (bytes, address) representing the 

455 datagram received and the address it came from. 

456 The maximum amount of data to be received at once is specified by 

457 nbytes. 

458 """ 

459 base_events._check_ssl_socket(sock) 

460 if self._debug and sock.gettimeout() != 0: 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true

461 raise ValueError("the socket must be non-blocking") 

462 try: 

463 return sock.recvfrom(bufsize) 

464 except (BlockingIOError, InterruptedError): 

465 pass 

466 fut = self.create_future() 

467 fd = sock.fileno() 

468 self._ensure_fd_no_transport(fd) 

469 handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize) 

470 fut.add_done_callback( 

471 functools.partial(self._sock_read_done, fd, handle=handle)) 

472 return await fut 

473 

474 def _sock_recvfrom(self, fut, sock, bufsize): 

475 # _sock_recvfrom() can add itself as an I/O callback if the operation 

476 # can't be done immediately. Don't use it directly, call 

477 # sock_recvfrom(). 

478 if fut.done(): 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true

479 return 

480 try: 

481 result = sock.recvfrom(bufsize) 

482 except (BlockingIOError, InterruptedError): 

483 return # try again next time 

484 except (SystemExit, KeyboardInterrupt): 

485 raise 

486 except BaseException as exc: 

487 fut.set_exception(exc) 

488 else: 

489 fut.set_result(result) 

490 

491 async def sock_recvfrom_into(self, sock, buf, nbytes=0): 

492 """Receive data from the socket. 

493 

494 The received data is written into *buf* (a writable buffer). 

495 The return value is a tuple of (number of bytes written, address). 

496 """ 

497 base_events._check_ssl_socket(sock) 

498 if self._debug and sock.gettimeout() != 0: 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true

499 raise ValueError("the socket must be non-blocking") 

500 if not nbytes: 

501 nbytes = len(buf) 

502 

503 try: 

504 return sock.recvfrom_into(buf, nbytes) 

505 except (BlockingIOError, InterruptedError): 

506 pass 

507 fut = self.create_future() 

508 fd = sock.fileno() 

509 self._ensure_fd_no_transport(fd) 

510 handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf, 

511 nbytes) 

512 fut.add_done_callback( 

513 functools.partial(self._sock_read_done, fd, handle=handle)) 

514 return await fut 

515 

516 def _sock_recvfrom_into(self, fut, sock, buf, bufsize): 

517 # _sock_recv_into() can add itself as an I/O callback if the operation 

518 # can't be done immediately. Don't use it directly, call 

519 # sock_recv_into(). 

520 if fut.done(): 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true

521 return 

522 try: 

523 result = sock.recvfrom_into(buf, bufsize) 

524 except (BlockingIOError, InterruptedError): 

525 return # try again next time 

526 except (SystemExit, KeyboardInterrupt): 

527 raise 

528 except BaseException as exc: 

529 fut.set_exception(exc) 

530 else: 

531 fut.set_result(result) 

532 

533 async def sock_sendall(self, sock, data): 

534 """Send data to the socket. 

535 

536 The socket must be connected to a remote socket. This method continues 

537 to send data from data until either all data has been sent or an 

538 error occurs. None is returned on success. On error, an exception is 

539 raised, and there is no way to determine how much data, if any, was 

540 successfully processed by the receiving end of the connection. 

541 """ 

542 base_events._check_ssl_socket(sock) 

543 if self._debug and sock.gettimeout() != 0: 

544 raise ValueError("the socket must be non-blocking") 

545 try: 

546 n = sock.send(data) 

547 except (BlockingIOError, InterruptedError): 

548 n = 0 

549 

550 if n == len(data): 

551 # all data sent 

552 return 

553 

554 fut = self.create_future() 

555 fd = sock.fileno() 

556 self._ensure_fd_no_transport(fd) 

557 # use a trick with a list in closure to store a mutable state 

558 handle = self._add_writer(fd, self._sock_sendall, fut, sock, 

559 memoryview(data), [n]) 

560 fut.add_done_callback( 

561 functools.partial(self._sock_write_done, fd, handle=handle)) 

562 return await fut 

563 

564 def _sock_sendall(self, fut, sock, view, pos): 

565 if fut.done(): 565 ↛ 567line 565 didn't jump to line 567 because the condition on line 565 was never true

566 # Future cancellation can be scheduled on previous loop iteration 

567 return 

568 start = pos[0] 

569 try: 

570 n = sock.send(view[start:]) 

571 except (BlockingIOError, InterruptedError): 

572 return 

573 except (SystemExit, KeyboardInterrupt): 

574 raise 

575 except BaseException as exc: 

576 fut.set_exception(exc) 

577 return 

578 

579 start += n 

580 

581 if start == len(view): 

582 fut.set_result(None) 

583 else: 

584 pos[0] = start 

585 

586 async def sock_sendto(self, sock, data, address): 

587 """Send data to the socket. 

588 

589 The socket must be connected to a remote socket. This method continues 

590 to send data from data until either all data has been sent or an 

591 error occurs. None is returned on success. On error, an exception is 

592 raised, and there is no way to determine how much data, if any, was 

593 successfully processed by the receiving end of the connection. 

594 """ 

595 base_events._check_ssl_socket(sock) 

596 if self._debug and sock.gettimeout() != 0: 596 ↛ 597line 596 didn't jump to line 597 because the condition on line 596 was never true

597 raise ValueError("the socket must be non-blocking") 

598 try: 

599 return sock.sendto(data, address) 

600 except (BlockingIOError, InterruptedError): 

601 pass 

602 

603 fut = self.create_future() 

604 fd = sock.fileno() 

605 self._ensure_fd_no_transport(fd) 

606 # use a trick with a list in closure to store a mutable state 

607 handle = self._add_writer(fd, self._sock_sendto, fut, sock, data, 

608 address) 

609 fut.add_done_callback( 

610 functools.partial(self._sock_write_done, fd, handle=handle)) 

611 return await fut 

612 

613 def _sock_sendto(self, fut, sock, data, address): 

614 if fut.done(): 614 ↛ 616line 614 didn't jump to line 616 because the condition on line 614 was never true

615 # Future cancellation can be scheduled on previous loop iteration 

616 return 

617 try: 

618 n = sock.sendto(data, 0, address) 

619 except (BlockingIOError, InterruptedError): 

620 return 

621 except (SystemExit, KeyboardInterrupt): 

622 raise 

623 except BaseException as exc: 

624 fut.set_exception(exc) 

625 else: 

626 fut.set_result(n) 

627 

628 async def sock_connect(self, sock, address): 

629 """Connect to a remote socket at address. 

630 

631 This method is a coroutine. 

632 """ 

633 base_events._check_ssl_socket(sock) 

634 if self._debug and sock.gettimeout() != 0: 

635 raise ValueError("the socket must be non-blocking") 

636 

637 if sock.family == socket.AF_INET or ( 

638 base_events._HAS_IPv6 and sock.family == socket.AF_INET6): 

639 resolved = await self._ensure_resolved( 

640 address, family=sock.family, type=sock.type, proto=sock.proto, 

641 loop=self, 

642 ) 

643 _, _, _, _, address = resolved[0] 

644 

645 fut = self.create_future() 

646 self._sock_connect(fut, sock, address) 

647 try: 

648 return await fut 

649 finally: 

650 # Needed to break cycles when an exception occurs. 

651 fut = None 

652 

653 def _sock_connect(self, fut, sock, address): 

654 fd = sock.fileno() 

655 try: 

656 sock.connect(address) 

657 except (BlockingIOError, InterruptedError): 

658 # Issue #23618: When the C function connect() fails with EINTR, the 

659 # connection runs in background. We have to wait until the socket 

660 # becomes writable to be notified when the connection succeed or 

661 # fails. 

662 self._ensure_fd_no_transport(fd) 

663 handle = self._add_writer( 

664 fd, self._sock_connect_cb, fut, sock, address) 

665 fut.add_done_callback( 

666 functools.partial(self._sock_write_done, fd, handle=handle)) 

667 except (SystemExit, KeyboardInterrupt): 

668 raise 

669 except BaseException as exc: 

670 fut.set_exception(exc) 

671 else: 

672 fut.set_result(None) 

673 finally: 

674 fut = None 

675 

676 def _sock_write_done(self, fd, fut, handle=None): 

677 if handle is None or not handle.cancelled(): 

678 self.remove_writer(fd) 

679 

680 def _sock_connect_cb(self, fut, sock, address): 

681 if fut.done(): 681 ↛ 682line 681 didn't jump to line 682 because the condition on line 681 was never true

682 return 

683 

684 try: 

685 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 

686 if err != 0: 

687 # Jump to any except clause below. 

688 raise OSError(err, f'Connect call failed {address}') 

689 except (BlockingIOError, InterruptedError): 

690 # socket is still registered, the callback will be retried later 

691 pass 

692 except (SystemExit, KeyboardInterrupt): 

693 raise 

694 except BaseException as exc: 

695 fut.set_exception(exc) 

696 else: 

697 fut.set_result(None) 

698 finally: 

699 fut = None 

700 

701 async def sock_accept(self, sock): 

702 """Accept a connection. 

703 

704 The socket must be bound to an address and listening for connections. 

705 The return value is a pair (conn, address) where conn is a new socket 

706 object usable to send and receive data on the connection, and address 

707 is the address bound to the socket on the other end of the connection. 

708 """ 

709 base_events._check_ssl_socket(sock) 

710 if self._debug and sock.gettimeout() != 0: 

711 raise ValueError("the socket must be non-blocking") 

712 fut = self.create_future() 

713 self._sock_accept(fut, sock) 

714 return await fut 

715 

716 def _sock_accept(self, fut, sock): 

717 fd = sock.fileno() 

718 try: 

719 conn, address = sock.accept() 

720 conn.setblocking(False) 

721 except (BlockingIOError, InterruptedError): 

722 self._ensure_fd_no_transport(fd) 

723 handle = self._add_reader(fd, self._sock_accept, fut, sock) 

724 fut.add_done_callback( 

725 functools.partial(self._sock_read_done, fd, handle=handle)) 

726 except (SystemExit, KeyboardInterrupt): 

727 raise 

728 except BaseException as exc: 

729 fut.set_exception(exc) 

730 else: 

731 fut.set_result((conn, address)) 

732 

733 async def _sendfile_native(self, transp, file, offset, count): 

734 del self._transports[transp._sock_fd] 

735 resume_reading = transp.is_reading() 

736 transp.pause_reading() 

737 await transp._make_empty_waiter() 

738 try: 

739 return await self.sock_sendfile(transp._sock, file, offset, count, 

740 fallback=False) 

741 finally: 

742 transp._reset_empty_waiter() 

743 if resume_reading: 743 ↛ 745line 743 didn't jump to line 745 because the condition on line 743 was always true

744 transp.resume_reading() 

745 self._transports[transp._sock_fd] = transp 

746 

747 def _process_events(self, event_list): 

748 for key, mask in event_list: 

749 fileobj, (reader, writer) = key.fileobj, key.data 

750 if mask & selectors.EVENT_READ and reader is not None: 

751 if reader._cancelled: 

752 self._remove_reader(fileobj) 

753 else: 

754 self._add_callback(reader) 

755 if mask & selectors.EVENT_WRITE and writer is not None: 

756 if writer._cancelled: 

757 self._remove_writer(fileobj) 

758 else: 

759 self._add_callback(writer) 

760 

761 def _stop_serving(self, sock): 

762 self._remove_reader(sock.fileno()) 

763 sock.close() 

764 

765 

766class _SelectorTransport(transports._FlowControlMixin, 

767 transports.Transport): 

768 

769 max_size = 256 * 1024 # Buffer size passed to recv(). 

770 

771 # Attribute used in the destructor: it must be set even if the constructor 

772 # is not called (see _SelectorSslTransport which may start by raising an 

773 # exception) 

774 _sock = None 

775 

776 def __init__(self, loop, sock, protocol, extra=None, server=None, context=None): 

777 super().__init__(extra, loop) 

778 self._extra['socket'] = trsock.TransportSocket(sock) 

779 try: 

780 self._extra['sockname'] = sock.getsockname() 

781 except OSError: 

782 self._extra['sockname'] = None 

783 if 'peername' not in self._extra: 

784 try: 

785 self._extra['peername'] = sock.getpeername() 

786 except socket.error: 

787 self._extra['peername'] = None 

788 self._sock = sock 

789 self._sock_fd = sock.fileno() 

790 self._context = context 

791 self._protocol_connected = False 

792 self.set_protocol(protocol) 

793 

794 self._server = server 

795 self._buffer = collections.deque() 

796 self._buffer_size = 0 

797 self._conn_lost = 0 # Set when call to connection_lost scheduled. 

798 self._closing = False # Set when close() called. 

799 self._paused = False # Set when pause_reading() called 

800 

801 if self._server is not None: 

802 self._server._attach(self) 

803 loop._transports[self._sock_fd] = self 

804 

805 def __repr__(self): 

806 info = [self.__class__.__name__] 

807 if self._sock is None: 807 ↛ 808line 807 didn't jump to line 808 because the condition on line 807 was never true

808 info.append('closed') 

809 elif self._closing: 

810 info.append('closing') 

811 info.append(f'fd={self._sock_fd}') 

812 # test if the transport was closed 

813 if self._loop is not None and not self._loop.is_closed(): 

814 polling = _test_selector_event(self._loop._selector, 

815 self._sock_fd, selectors.EVENT_READ) 

816 if polling: 

817 info.append('read=polling') 

818 else: 

819 info.append('read=idle') 

820 

821 polling = _test_selector_event(self._loop._selector, 

822 self._sock_fd, 

823 selectors.EVENT_WRITE) 

824 if polling: 824 ↛ 825line 824 didn't jump to line 825 because the condition on line 824 was never true

825 state = 'polling' 

826 else: 

827 state = 'idle' 

828 

829 bufsize = self.get_write_buffer_size() 

830 info.append(f'write=<{state}, bufsize={bufsize}>') 

831 return '<{}>'.format(' '.join(info)) 

832 

833 def abort(self): 

834 self._force_close(None) 

835 

836 def set_protocol(self, protocol): 

837 self._protocol = protocol 

838 self._protocol_connected = True 

839 

840 def get_protocol(self): 

841 return self._protocol 

842 

843 def is_closing(self): 

844 return self._closing 

845 

846 def is_reading(self): 

847 return not self.is_closing() and not self._paused 

848 

849 def pause_reading(self): 

850 if not self.is_reading(): 

851 return 

852 self._paused = True 

853 self._loop._remove_reader(self._sock_fd) 

854 if self._loop.get_debug(): 

855 logger.debug("%r pauses reading", self) 

856 

857 def resume_reading(self): 

858 if self._closing or not self._paused: 

859 return 

860 self._paused = False 

861 self._add_reader(self._sock_fd, self._read_ready) 

862 if self._loop.get_debug(): 862 ↛ 863line 862 didn't jump to line 863 because the condition on line 862 was never true

863 logger.debug("%r resumes reading", self) 

864 

865 def close(self): 

866 if self._closing: 

867 return 

868 self._closing = True 

869 self._loop._remove_reader(self._sock_fd) 

870 if not self._buffer: 

871 self._conn_lost += 1 

872 self._loop._remove_writer(self._sock_fd) 

873 self._call_soon(self._call_connection_lost, None) 

874 

875 def __del__(self, _warn=warnings.warn): 

876 if self._sock is not None: 876 ↛ 877line 876 didn't jump to line 877 because the condition on line 876 was never true

877 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 

878 self._sock.close() 

879 if self._server is not None: 

880 self._server._detach(self) 

881 

882 def _fatal_error(self, exc, message='Fatal error on transport'): 

883 # Should be called from exception handler only. 

884 if isinstance(exc, OSError): 

885 if self._loop.get_debug(): 885 ↛ 886line 885 didn't jump to line 886 because the condition on line 885 was never true

886 logger.debug("%r: %s", self, message, exc_info=True) 

887 else: 

888 self._loop.call_exception_handler({ 

889 'message': message, 

890 'exception': exc, 

891 'transport': self, 

892 'protocol': self._protocol, 

893 }) 

894 self._force_close(exc) 

895 

896 def _force_close(self, exc): 

897 if self._conn_lost: 

898 return 

899 if self._buffer: 

900 self._buffer.clear() 

901 self._buffer_size = 0 

902 self._loop._remove_writer(self._sock_fd) 

903 if not self._closing: 

904 self._closing = True 

905 self._loop._remove_reader(self._sock_fd) 

906 self._conn_lost += 1 

907 self._call_soon(self._call_connection_lost, exc) 

908 

909 def _call_connection_lost(self, exc): 

910 try: 

911 if self._protocol_connected: 911 ↛ 914line 911 didn't jump to line 914 because the condition on line 911 was always true

912 self._protocol.connection_lost(exc) 

913 finally: 

914 self._sock.close() 

915 self._sock = None 

916 self._protocol = None 

917 self._loop = None 

918 server = self._server 

919 if server is not None: 

920 server._detach(self) 

921 self._server = None 

922 

923 def get_write_buffer_size(self): 

924 return self._buffer_size 

925 

926 def _add_reader(self, fd, callback, *args): 

927 if not self.is_reading(): 

928 return 

929 self._loop._add_reader(fd, callback, *args, context=self._context) 

930 

931 def _add_writer(self, fd, callback, *args): 

932 self._loop._add_writer(fd, callback, *args, context=self._context) 

933 

934 def _call_soon(self, callback, *args): 

935 self._loop.call_soon(callback, *args, context=self._context) 

936 

937class _SelectorSocketTransport(_SelectorTransport): 

938 

939 _start_tls_compatible = True 

940 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE 

941 

942 def __init__(self, loop, sock, protocol, waiter=None, 

943 extra=None, server=None, context=None): 

944 self._read_ready_cb = None 

945 super().__init__(loop, sock, protocol, extra, server, context) 

946 self._eof = False 

947 self._empty_waiter = None 

948 if _HAS_SENDMSG: 948 ↛ 951line 948 didn't jump to line 951 because the condition on line 948 was always true

949 self._write_ready = self._write_sendmsg 

950 else: 

951 self._write_ready = self._write_send 

952 # Disable the Nagle algorithm -- small writes will be 

953 # sent without waiting for the TCP ACK. This generally 

954 # decreases the latency (in some cases significantly.) 

955 base_events._set_nodelay(self._sock) 

956 

957 self._call_soon(self._protocol.connection_made, self) 

958 # only start reading when connection_made() has been called 

959 self._call_soon(self._add_reader, self._sock_fd, self._read_ready) 

960 if waiter is not None: 

961 # only wake up the waiter when connection_made() has been called 

962 self._call_soon(futures._set_result_unless_cancelled, waiter, None) 

963 

964 def set_protocol(self, protocol): 

965 if isinstance(protocol, protocols.BufferedProtocol): 

966 self._read_ready_cb = self._read_ready__get_buffer 

967 else: 

968 self._read_ready_cb = self._read_ready__data_received 

969 

970 super().set_protocol(protocol) 

971 

972 def _read_ready(self): 

973 self._read_ready_cb() 

974 

975 def _read_ready__get_buffer(self): 

976 if self._conn_lost: 976 ↛ 977line 976 didn't jump to line 977 because the condition on line 976 was never true

977 return 

978 

979 try: 

980 buf = self._protocol.get_buffer(-1) 

981 if not len(buf): 

982 raise RuntimeError('get_buffer() returned an empty buffer') 

983 except (SystemExit, KeyboardInterrupt): 

984 raise 

985 except BaseException as exc: 

986 self._fatal_error( 

987 exc, 'Fatal error: protocol.get_buffer() call failed.') 

988 return 

989 

990 try: 

991 nbytes = self._sock.recv_into(buf) 

992 except (BlockingIOError, InterruptedError): 

993 return 

994 except (SystemExit, KeyboardInterrupt): 

995 raise 

996 except BaseException as exc: 

997 self._fatal_error(exc, 'Fatal read error on socket transport') 

998 return 

999 

1000 if not nbytes: 

1001 self._read_ready__on_eof() 

1002 return 

1003 

1004 try: 

1005 self._protocol.buffer_updated(nbytes) 

1006 except (SystemExit, KeyboardInterrupt): 

1007 raise 

1008 except BaseException as exc: 

1009 self._fatal_error( 

1010 exc, 'Fatal error: protocol.buffer_updated() call failed.') 

1011 

1012 def _read_ready__data_received(self): 

1013 if self._conn_lost: 1013 ↛ 1014line 1013 didn't jump to line 1014 because the condition on line 1013 was never true

1014 return 

1015 try: 

1016 data = self._sock.recv(self.max_size) 

1017 except (BlockingIOError, InterruptedError): 

1018 return 

1019 except (SystemExit, KeyboardInterrupt): 

1020 raise 

1021 except BaseException as exc: 

1022 self._fatal_error(exc, 'Fatal read error on socket transport') 

1023 return 

1024 

1025 if not data: 

1026 self._read_ready__on_eof() 

1027 return 

1028 

1029 try: 

1030 self._protocol.data_received(data) 

1031 except (SystemExit, KeyboardInterrupt): 

1032 raise 

1033 except BaseException as exc: 

1034 self._fatal_error( 

1035 exc, 'Fatal error: protocol.data_received() call failed.') 

1036 

1037 def _read_ready__on_eof(self): 

1038 if self._loop.get_debug(): 

1039 logger.debug("%r received EOF", self) 

1040 

1041 try: 

1042 keep_open = self._protocol.eof_received() 

1043 except (SystemExit, KeyboardInterrupt): 

1044 raise 

1045 except BaseException as exc: 

1046 self._fatal_error( 

1047 exc, 'Fatal error: protocol.eof_received() call failed.') 

1048 return 

1049 

1050 if keep_open: 

1051 # We're keeping the connection open so the 

1052 # protocol can write more, but we still can't 

1053 # receive more, so remove the reader callback. 

1054 self._loop._remove_reader(self._sock_fd) 

1055 else: 

1056 self.close() 

1057 

1058 def write(self, data): 

1059 if not isinstance(data, (bytes, bytearray, memoryview)): 

1060 raise TypeError(f'data argument must be a bytes, bytearray, or memoryview ' 

1061 f'object, not {type(data).__name__!r}') 

1062 if self._eof: 1062 ↛ 1063line 1062 didn't jump to line 1063 because the condition on line 1062 was never true

1063 raise RuntimeError('Cannot call write() after write_eof()') 

1064 if self._empty_waiter is not None: 

1065 raise RuntimeError('unable to write; sendfile is in progress') 

1066 if not data: 

1067 return 

1068 

1069 if self._conn_lost: 

1070 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 

1071 logger.warning('socket.send() raised exception.') 

1072 self._conn_lost += 1 

1073 return 

1074 

1075 if not self._buffer: 

1076 # Optimization: try to send now. 

1077 try: 

1078 n = self._sock.send(data) 

1079 except (BlockingIOError, InterruptedError): 

1080 pass 

1081 except (SystemExit, KeyboardInterrupt): 

1082 raise 

1083 except BaseException as exc: 

1084 self._fatal_error(exc, 'Fatal write error on socket transport') 

1085 return 

1086 else: 

1087 data = memoryview(data)[n:] 

1088 if not data: 

1089 return 

1090 # Not all was written; register write handler. 

1091 self._add_writer(self._sock_fd, self._write_ready) 

1092 

1093 # Add it to the buffer. 

1094 self._buffer.append(data) 

1095 self._buffer_size += len(data) 

1096 self._maybe_pause_protocol() 

1097 

1098 def _get_sendmsg_buffer(self): 

1099 return itertools.islice(self._buffer, SC_IOV_MAX) 

1100 

1101 def _write_sendmsg(self): 

1102 assert self._buffer, 'Data should not be empty' 

1103 if self._conn_lost: 1103 ↛ 1104line 1103 didn't jump to line 1104 because the condition on line 1103 was never true

1104 return 

1105 try: 

1106 nbytes = self._sock.sendmsg(self._get_sendmsg_buffer()) 

1107 self._adjust_leftover_buffer(nbytes) 

1108 except (BlockingIOError, InterruptedError): 

1109 pass 

1110 except (SystemExit, KeyboardInterrupt): 

1111 raise 

1112 except BaseException as exc: 

1113 self._loop._remove_writer(self._sock_fd) 

1114 self._buffer.clear() 

1115 self._buffer_size = 0 

1116 self._fatal_error(exc, 'Fatal write error on socket transport') 

1117 if self._empty_waiter is not None: 1117 ↛ 1118line 1117 didn't jump to line 1118 because the condition on line 1117 was never true

1118 self._empty_waiter.set_exception(exc) 

1119 else: 

1120 self._maybe_resume_protocol() # May append to buffer. 

1121 if not self._buffer: 

1122 self._loop._remove_writer(self._sock_fd) 

1123 if self._empty_waiter is not None: 

1124 self._empty_waiter.set_result(None) 

1125 if self._closing: 

1126 self._call_connection_lost(None) 

1127 elif self._eof: 1127 ↛ 1128line 1127 didn't jump to line 1128 because the condition on line 1127 was never true

1128 self._sock.shutdown(socket.SHUT_WR) 

1129 

1130 def _adjust_leftover_buffer(self, nbytes: int) -> None: 

1131 self._buffer_size -= nbytes 

1132 buffer = self._buffer 

1133 while nbytes: 

1134 b = buffer.popleft() 

1135 b_len = len(b) 

1136 if b_len <= nbytes: 

1137 nbytes -= b_len 

1138 else: 

1139 buffer.appendleft(b[nbytes:]) 

1140 break 

1141 

1142 def _write_send(self): 

1143 assert self._buffer, 'Data should not be empty' 

1144 if self._conn_lost: 1144 ↛ 1145line 1144 didn't jump to line 1145 because the condition on line 1144 was never true

1145 return 

1146 try: 

1147 buffer = self._buffer.popleft() 

1148 n = self._sock.send(buffer) 

1149 if n != len(buffer): 

1150 # Not all data was written 

1151 self._buffer.appendleft(buffer[n:]) 

1152 self._buffer_size -= n 

1153 except (BlockingIOError, InterruptedError): 

1154 self._buffer.appendleft(buffer) 

1155 return 

1156 except (SystemExit, KeyboardInterrupt): 

1157 raise 

1158 except BaseException as exc: 

1159 self._loop._remove_writer(self._sock_fd) 

1160 self._buffer.clear() 

1161 self._buffer_size = 0 

1162 self._fatal_error(exc, 'Fatal write error on socket transport') 

1163 if self._empty_waiter is not None: 1163 ↛ 1164line 1163 didn't jump to line 1164 because the condition on line 1163 was never true

1164 self._empty_waiter.set_exception(exc) 

1165 else: 

1166 self._maybe_resume_protocol() # May append to buffer. 

1167 if not self._buffer: 

1168 self._loop._remove_writer(self._sock_fd) 

1169 if self._empty_waiter is not None: 1169 ↛ 1170line 1169 didn't jump to line 1170 because the condition on line 1169 was never true

1170 self._empty_waiter.set_result(None) 

1171 if self._closing: 

1172 self._call_connection_lost(None) 

1173 elif self._eof: 

1174 self._sock.shutdown(socket.SHUT_WR) 

1175 

1176 def write_eof(self): 

1177 if self._closing or self._eof: 

1178 return 

1179 self._eof = True 

1180 if not self._buffer: 

1181 self._sock.shutdown(socket.SHUT_WR) 

1182 

1183 def writelines(self, list_of_data): 

1184 if self._eof: 1184 ↛ 1185line 1184 didn't jump to line 1185 because the condition on line 1184 was never true

1185 raise RuntimeError('Cannot call writelines() after write_eof()') 

1186 if self._empty_waiter is not None: 1186 ↛ 1187line 1186 didn't jump to line 1187 because the condition on line 1186 was never true

1187 raise RuntimeError('unable to writelines; sendfile is in progress') 

1188 if not list_of_data: 1188 ↛ 1189line 1188 didn't jump to line 1189 because the condition on line 1188 was never true

1189 return 

1190 

1191 if self._conn_lost: 

1192 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 1192 ↛ 1193line 1192 didn't jump to line 1193 because the condition on line 1192 was never true

1193 logger.warning('socket.send() raised exception.') 

1194 self._conn_lost += 1 

1195 return 

1196 

1197 for data in list_of_data: 

1198 self._buffer.append(memoryview(data)) 

1199 self._buffer_size += len(data) 

1200 self._write_ready() 

1201 # If the entire buffer couldn't be written, register a write handler 

1202 if self._buffer: 

1203 self._add_writer(self._sock_fd, self._write_ready) 

1204 self._maybe_pause_protocol() 

1205 

1206 def can_write_eof(self): 

1207 return True 

1208 

1209 def _call_connection_lost(self, exc): 

1210 try: 

1211 super()._call_connection_lost(exc) 

1212 finally: 

1213 self._write_ready = None 

1214 if self._empty_waiter is not None: 1214 ↛ 1215line 1214 didn't jump to line 1215 because the condition on line 1214 was never true

1215 self._empty_waiter.set_exception( 

1216 ConnectionError("Connection is closed by peer")) 

1217 

1218 def _make_empty_waiter(self): 

1219 if self._empty_waiter is not None: 1219 ↛ 1220line 1219 didn't jump to line 1220 because the condition on line 1219 was never true

1220 raise RuntimeError("Empty waiter is already set") 

1221 self._empty_waiter = self._loop.create_future() 

1222 if not self._buffer: 

1223 self._empty_waiter.set_result(None) 

1224 return self._empty_waiter 

1225 

1226 def _reset_empty_waiter(self): 

1227 self._empty_waiter = None 

1228 

1229 def close(self): 

1230 self._read_ready_cb = None 

1231 super().close() 

1232 

1233 

1234class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport): 

1235 

1236 _buffer_factory = collections.deque 

1237 _header_size = 8 

1238 

1239 def __init__(self, loop, sock, protocol, address=None, 

1240 waiter=None, extra=None): 

1241 super().__init__(loop, sock, protocol, extra) 

1242 self._address = address 

1243 self._buffer_size = 0 

1244 self._call_soon(self._protocol.connection_made, self) 

1245 # only start reading when connection_made() has been called 

1246 self._call_soon(self._add_reader, self._sock_fd, self._read_ready) 

1247 if waiter is not None: 

1248 # only wake up the waiter when connection_made() has been called 

1249 self._call_soon(futures._set_result_unless_cancelled, waiter, None) 

1250 

1251 def get_write_buffer_size(self): 

1252 return self._buffer_size 

1253 

1254 def _read_ready(self): 

1255 if self._conn_lost: 1255 ↛ 1256line 1255 didn't jump to line 1256 because the condition on line 1255 was never true

1256 return 

1257 try: 

1258 data, addr = self._sock.recvfrom(self.max_size) 

1259 except (BlockingIOError, InterruptedError): 

1260 pass 

1261 except OSError as exc: 

1262 self._protocol.error_received(exc) 

1263 except (SystemExit, KeyboardInterrupt): 

1264 raise 

1265 except BaseException as exc: 

1266 self._fatal_error(exc, 'Fatal read error on datagram transport') 

1267 else: 

1268 self._protocol.datagram_received(data, addr) 

1269 

1270 def sendto(self, data, addr=None): 

1271 if not isinstance(data, (bytes, bytearray, memoryview)): 

1272 raise TypeError(f'data argument must be a bytes-like object, ' 

1273 f'not {type(data).__name__!r}') 

1274 

1275 if self._address: 

1276 if addr not in (None, self._address): 

1277 raise ValueError( 

1278 f'Invalid address: must be None or {self._address}') 

1279 addr = self._address 

1280 

1281 if self._conn_lost and self._address: 

1282 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 

1283 logger.warning('socket.send() raised exception.') 

1284 self._conn_lost += 1 

1285 return 

1286 

1287 if not self._buffer: 

1288 # Attempt to send it right away first. 

1289 try: 

1290 if self._extra['peername']: 

1291 self._sock.send(data) 

1292 else: 

1293 self._sock.sendto(data, addr) 

1294 return 

1295 except (BlockingIOError, InterruptedError): 

1296 self._add_writer(self._sock_fd, self._sendto_ready) 

1297 except OSError as exc: 

1298 self._protocol.error_received(exc) 

1299 return 

1300 except (SystemExit, KeyboardInterrupt): 

1301 raise 

1302 except BaseException as exc: 

1303 self._fatal_error( 

1304 exc, 'Fatal write error on datagram transport') 

1305 return 

1306 

1307 # Ensure that what we buffer is immutable. 

1308 self._buffer.append((bytes(data), addr)) 

1309 self._buffer_size += len(data) + self._header_size 

1310 self._maybe_pause_protocol() 

1311 

1312 def _sendto_ready(self): 

1313 while self._buffer: 

1314 data, addr = self._buffer.popleft() 

1315 self._buffer_size -= len(data) + self._header_size 

1316 try: 

1317 if self._extra['peername']: 

1318 self._sock.send(data) 

1319 else: 

1320 self._sock.sendto(data, addr) 

1321 except (BlockingIOError, InterruptedError): 

1322 self._buffer.appendleft((data, addr)) # Try again later. 

1323 self._buffer_size += len(data) + self._header_size 

1324 break 

1325 except OSError as exc: 

1326 self._protocol.error_received(exc) 

1327 return 

1328 except (SystemExit, KeyboardInterrupt): 

1329 raise 

1330 except BaseException as exc: 

1331 self._fatal_error( 

1332 exc, 'Fatal write error on datagram transport') 

1333 return 

1334 

1335 self._maybe_resume_protocol() # May append to buffer. 

1336 if not self._buffer: 

1337 self._loop._remove_writer(self._sock_fd) 

1338 if self._closing: 

1339 self._call_connection_lost(None)