Coverage for Lib/asyncio/selector_events.py: 87%

921 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-24 03:28 +0000

1"""Event loop using a selector and related classes. 

2 

3A selector is a "notify-when-ready" multiplexer. For a subclass which 

4also includes support for signal handling, see the unix_events sub-module. 

5""" 

6 

7__all__ = 'BaseSelectorEventLoop', 

8 

9import collections 

10import errno 

11import functools 

12import itertools 

13import os 

14import selectors 

15import socket 

16import warnings 

17import weakref 

18try: 

19 import ssl 

20except ImportError: # pragma: no cover 

21 ssl = None 

22 

23from . import base_events 

24from . import constants 

25from . import events 

26from . import futures 

27from . import protocols 

28from . import sslproto 

29from . import transports 

30from . import trsock 

31from .log import logger 

32 

33_HAS_SENDMSG = hasattr(socket.socket, 'sendmsg') 

34 

35if _HAS_SENDMSG: 35 ↛ 42line 35 didn't jump to line 42 because the condition on line 35 was always true

36 try: 

37 SC_IOV_MAX = os.sysconf('SC_IOV_MAX') 

38 except OSError: 

39 # Fallback to send 

40 _HAS_SENDMSG = False 

41 

42def _test_selector_event(selector, fd, event): 

43 # Test if the selector is monitoring 'event' events 

44 # for the file descriptor 'fd'. 

45 try: 

46 key = selector.get_key(fd) 

47 except KeyError: 

48 return False 

49 else: 

50 return bool(key.events & event) 

51 

52 

53class BaseSelectorEventLoop(base_events.BaseEventLoop): 

54 """Selector event loop. 

55 

56 See events.AbstractEventLoop for API specification. 

57 """ 

58 

59 def __init__(self, selector=None): 

60 super().__init__() 

61 

62 if selector is None: 

63 selector = selectors.DefaultSelector() 

64 logger.debug('Using selector: %s', selector.__class__.__name__) 

65 self._selector = selector 

66 self._make_self_pipe() 

67 self._transports = weakref.WeakValueDictionary() 

68 

69 def _make_socket_transport(self, sock, protocol, waiter=None, *, 

70 extra=None, server=None, context=None): 

71 self._ensure_fd_no_transport(sock) 

72 return _SelectorSocketTransport(self, sock, protocol, waiter, 

73 extra, server, context=context) 

74 

75 def _make_ssl_transport( 

76 self, rawsock, protocol, sslcontext, waiter=None, 

77 *, server_side=False, server_hostname=None, 

78 extra=None, server=None, 

79 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 

80 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT, 

81 context=None, 

82 ): 

83 self._ensure_fd_no_transport(rawsock) 

84 ssl_protocol = sslproto.SSLProtocol( 

85 self, protocol, sslcontext, waiter, 

86 server_side, server_hostname, 

87 ssl_handshake_timeout=ssl_handshake_timeout, 

88 ssl_shutdown_timeout=ssl_shutdown_timeout, 

89 ) 

90 _SelectorSocketTransport(self, rawsock, ssl_protocol, 

91 extra=extra, server=server, context=context) 

92 return ssl_protocol._app_transport 

93 

94 def _make_datagram_transport(self, sock, protocol, 

95 address=None, waiter=None, extra=None): 

96 self._ensure_fd_no_transport(sock) 

97 return _SelectorDatagramTransport(self, sock, protocol, 

98 address, waiter, extra) 

99 

100 def close(self): 

101 if self.is_running(): 

102 raise RuntimeError("Cannot close a running event loop") 

103 if self.is_closed(): 

104 return 

105 self._close_self_pipe() 

106 super().close() 

107 if self._selector is not None: 

108 self._selector.close() 

109 self._selector = None 

110 

111 def _close_self_pipe(self): 

112 self._remove_reader(self._ssock.fileno()) 

113 self._ssock.close() 

114 self._ssock = None 

115 self._csock.close() 

116 self._csock = None 

117 self._internal_fds -= 1 

118 

119 def _make_self_pipe(self): 

120 # A self-socket, really. :-) 

121 self._ssock, self._csock = socket.socketpair() 

122 self._ssock.setblocking(False) 

123 self._csock.setblocking(False) 

124 self._internal_fds += 1 

125 self._add_reader(self._ssock.fileno(), self._read_from_self) 

126 

127 def _process_self_data(self, data): 

128 pass 

129 

130 def _read_from_self(self): 

131 while True: 

132 try: 

133 data = self._ssock.recv(4096) 

134 if not data: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 break 

136 self._process_self_data(data) 

137 except InterruptedError: 

138 continue 

139 except BlockingIOError: 

140 break 

141 

142 def _write_to_self(self): 

143 # This may be called from a different thread, possibly after 

144 # _close_self_pipe() has been called or even while it is 

145 # running. Guard for self._csock being None or closed. When 

146 # a socket is closed, send() raises OSError (with errno set to 

147 # EBADF, but let's not rely on the exact error code). 

148 csock = self._csock 

149 if csock is None: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true

150 return 

151 

152 try: 

153 csock.send(b'\0') 

154 except OSError: 

155 if self._debug: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 logger.debug("Fail to write a null byte into the " 

157 "self-pipe socket", 

158 exc_info=True) 

159 

160 def _start_serving(self, protocol_factory, sock, 

161 sslcontext=None, server=None, backlog=100, 

162 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 

163 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT, context=None): 

164 self._add_reader(sock.fileno(), self._accept_connection, 

165 protocol_factory, sock, sslcontext, server, backlog, 

166 ssl_handshake_timeout, ssl_shutdown_timeout, context) 

167 

168 def _accept_connection( 

169 self, protocol_factory, sock, 

170 sslcontext=None, server=None, backlog=100, 

171 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 

172 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT, context=None): 

173 # This method is only called once for each event loop tick where the 

174 # listening socket has triggered an EVENT_READ. There may be multiple 

175 # connections waiting for an .accept() so it is called in a loop. 

176 # See https://bugs.python.org/issue27906 for more details. 

177 for _ in range(backlog + 1): 

178 try: 

179 conn, addr = sock.accept() 

180 if self._debug: 

181 logger.debug("%r got a new connection from %r: %r", 

182 server, addr, conn) 

183 conn.setblocking(False) 

184 except ConnectionAbortedError: 

185 # Discard connections that were aborted before accept(). 

186 continue 

187 except (BlockingIOError, InterruptedError): 

188 # Early exit because of a signal or 

189 # the socket accept buffer is empty. 

190 return 

191 except OSError as exc: 

192 # There's nowhere to send the error, so just log it. 

193 if exc.errno in (errno.EMFILE, errno.ENFILE, 193 ↛ 210line 193 didn't jump to line 210 because the condition on line 193 was always true

194 errno.ENOBUFS, errno.ENOMEM): 

195 # Some platforms (e.g. Linux keep reporting the FD as 

196 # ready, so we remove the read handler temporarily. 

197 # We'll try again in a while. 

198 self.call_exception_handler({ 

199 'message': 'socket.accept() out of system resource', 

200 'exception': exc, 

201 'socket': trsock.TransportSocket(sock), 

202 }) 

203 self._remove_reader(sock.fileno()) 

204 self.call_later(constants.ACCEPT_RETRY_DELAY, 

205 self._start_serving, 

206 protocol_factory, sock, sslcontext, server, 

207 backlog, ssl_handshake_timeout, 

208 ssl_shutdown_timeout, context) 

209 else: 

210 raise # The event loop will catch, log and ignore it. 

211 else: 

212 extra = {'peername': addr} 

213 conn_context = context.copy() if context is not None else None 

214 accept = self._accept_connection2( 

215 protocol_factory, conn, extra, sslcontext, server, 

216 ssl_handshake_timeout, ssl_shutdown_timeout, context=conn_context) 

217 self.create_task(accept, context=conn_context) 

218 

219 async def _accept_connection2( 

220 self, protocol_factory, conn, extra, 

221 sslcontext=None, server=None, 

222 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 

223 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT, context=None): 

224 protocol = None 

225 transport = None 

226 try: 

227 protocol = protocol_factory() 

228 waiter = self.create_future() 

229 if sslcontext: 

230 transport = self._make_ssl_transport( 

231 conn, protocol, sslcontext, waiter=waiter, 

232 server_side=True, extra=extra, server=server, 

233 ssl_handshake_timeout=ssl_handshake_timeout, 

234 ssl_shutdown_timeout=ssl_shutdown_timeout, 

235 context=context) 

236 else: 

237 transport = self._make_socket_transport( 

238 conn, protocol, waiter=waiter, extra=extra, 

239 server=server, context=context) 

240 

241 try: 

242 await waiter 

243 except BaseException: 

244 transport.close() 

245 # gh-109534: When an exception is raised by the SSLProtocol object the 

246 # exception set in this future can keep the protocol object alive and 

247 # cause a reference cycle. 

248 waiter = None 

249 raise 

250 # It's now up to the protocol to handle the connection. 

251 

252 except (SystemExit, KeyboardInterrupt): 

253 raise 

254 except BaseException as exc: 

255 if self._debug: 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true

256 context = { 

257 'message': 

258 'Error on transport creation for incoming connection', 

259 'exception': exc, 

260 } 

261 if protocol is not None: 

262 context['protocol'] = protocol 

263 if transport is not None: 

264 context['transport'] = transport 

265 self.call_exception_handler(context) 

266 

267 def _ensure_fd_no_transport(self, fd): 

268 fileno = fd 

269 if not isinstance(fileno, int): 

270 try: 

271 fileno = int(fileno.fileno()) 

272 except (AttributeError, TypeError, ValueError): 

273 # This code matches selectors._fileobj_to_fd function. 

274 raise ValueError(f"Invalid file object: {fd!r}") from None 

275 transport = self._transports.get(fileno) 

276 if transport and not transport.is_closing(): 

277 raise RuntimeError( 

278 f'File descriptor {fd!r} is used by transport ' 

279 f'{transport!r}') 

280 

281 def _add_reader(self, fd, callback, *args, context=None): 

282 self._check_closed() 

283 handle = events.Handle(callback, args, self, context=context) 

284 key = self._selector.get_map().get(fd) 

285 if key is None: 

286 self._selector.register(fd, selectors.EVENT_READ, 

287 (handle, None)) 

288 else: 

289 mask, (reader, writer) = key.events, key.data 

290 self._selector.modify(fd, mask | selectors.EVENT_READ, 

291 (handle, writer)) 

292 if reader is not None: 

293 reader.cancel() 

294 return handle 

295 

296 def _remove_reader(self, fd): 

297 if self.is_closed(): 

298 return False 

299 key = self._selector.get_map().get(fd) 

300 if key is None: 

301 return False 

302 mask, (reader, writer) = key.events, key.data 

303 mask &= ~selectors.EVENT_READ 

304 if not mask: 

305 self._selector.unregister(fd) 

306 else: 

307 self._selector.modify(fd, mask, (None, writer)) 

308 

309 if reader is not None: 

310 reader.cancel() 

311 return True 

312 else: 

313 return False 

314 

315 def _add_writer(self, fd, callback, *args, context=None): 

316 self._check_closed() 

317 handle = events.Handle(callback, args, self, context=context) 

318 key = self._selector.get_map().get(fd) 

319 if key is None: 

320 self._selector.register(fd, selectors.EVENT_WRITE, 

321 (None, handle)) 

322 else: 

323 mask, (reader, writer) = key.events, key.data 

324 self._selector.modify(fd, mask | selectors.EVENT_WRITE, 

325 (reader, handle)) 

326 if writer is not None: 

327 writer.cancel() 

328 return handle 

329 

330 def _remove_writer(self, fd): 

331 """Remove a writer callback.""" 

332 if self.is_closed(): 

333 return False 

334 key = self._selector.get_map().get(fd) 

335 if key is None: 

336 return False 

337 mask, (reader, writer) = key.events, key.data 

338 # Remove both writer and connector. 

339 mask &= ~selectors.EVENT_WRITE 

340 if not mask: 

341 self._selector.unregister(fd) 

342 else: 

343 self._selector.modify(fd, mask, (reader, None)) 

344 

345 if writer is not None: 

346 writer.cancel() 

347 return True 

348 else: 

349 return False 

350 

351 def add_reader(self, fd, callback, *args): 

352 """Add a reader callback.""" 

353 self._ensure_fd_no_transport(fd) 

354 self._add_reader(fd, callback, *args) 

355 

356 def remove_reader(self, fd): 

357 """Remove a reader callback.""" 

358 self._ensure_fd_no_transport(fd) 

359 return self._remove_reader(fd) 

360 

361 def add_writer(self, fd, callback, *args): 

362 """Add a writer callback..""" 

363 self._ensure_fd_no_transport(fd) 

364 self._add_writer(fd, callback, *args) 

365 

366 def remove_writer(self, fd): 

367 """Remove a writer callback.""" 

368 self._ensure_fd_no_transport(fd) 

369 return self._remove_writer(fd) 

370 

371 async def sock_recv(self, sock, n): 

372 """Receive data from the socket. 

373 

374 The return value is a bytes object representing the data received. 

375 The maximum amount of data to be received at once is specified by 

376 nbytes. 

377 """ 

378 base_events._check_ssl_socket(sock) 

379 if self._debug and sock.gettimeout() != 0: 

380 raise ValueError("the socket must be non-blocking") 

381 try: 

382 return sock.recv(n) 

383 except (BlockingIOError, InterruptedError): 

384 pass 

385 fut = self.create_future() 

386 fd = sock.fileno() 

387 self._ensure_fd_no_transport(fd) 

388 handle = self._add_reader(fd, self._sock_recv, fut, sock, n) 

389 fut.add_done_callback( 

390 functools.partial(self._sock_read_done, fd, handle=handle)) 

391 return await fut 

392 

393 def _sock_read_done(self, fd, fut, handle=None): 

394 if handle is None or not handle.cancelled(): 

395 self.remove_reader(fd) 

396 

397 def _sock_recv(self, fut, sock, n): 

398 # _sock_recv() can add itself as an I/O callback if the operation can't 

399 # be done immediately. Don't use it directly, call sock_recv(). 

400 if fut.done(): 400 ↛ 401line 400 didn't jump to line 401 because the condition on line 400 was never true

401 return 

402 try: 

403 data = sock.recv(n) 

404 except (BlockingIOError, InterruptedError): 

405 return # try again next time 

406 except (SystemExit, KeyboardInterrupt): 

407 raise 

408 except BaseException as exc: 

409 fut.set_exception(exc) 

410 else: 

411 fut.set_result(data) 

412 

413 async def sock_recv_into(self, sock, buf): 

414 """Receive data from the socket. 

415 

416 The received data is written into *buf* (a writable buffer). 

417 The return value is the number of bytes written. 

418 """ 

419 base_events._check_ssl_socket(sock) 

420 if self._debug and sock.gettimeout() != 0: 

421 raise ValueError("the socket must be non-blocking") 

422 try: 

423 return sock.recv_into(buf) 

424 except (BlockingIOError, InterruptedError): 

425 pass 

426 fut = self.create_future() 

427 fd = sock.fileno() 

428 self._ensure_fd_no_transport(fd) 

429 handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf) 

430 fut.add_done_callback( 

431 functools.partial(self._sock_read_done, fd, handle=handle)) 

432 return await fut 

433 

434 def _sock_recv_into(self, fut, sock, buf): 

435 # _sock_recv_into() can add itself as an I/O callback if the operation 

436 # can't be done immediately. Don't use it directly, call 

437 # sock_recv_into(). 

438 if fut.done(): 438 ↛ 439line 438 didn't jump to line 439 because the condition on line 438 was never true

439 return 

440 try: 

441 nbytes = sock.recv_into(buf) 

442 except (BlockingIOError, InterruptedError): 

443 return # try again next time 

444 except (SystemExit, KeyboardInterrupt): 

445 raise 

446 except BaseException as exc: 

447 fut.set_exception(exc) 

448 else: 

449 fut.set_result(nbytes) 

450 

451 async def sock_recvfrom(self, sock, bufsize): 

452 """Receive a datagram from a datagram socket. 

453 

454 The return value is a tuple of (bytes, address) representing the 

455 datagram received and the address it came from. 

456 The maximum amount of data to be received at once is specified by 

457 nbytes. 

458 """ 

459 base_events._check_ssl_socket(sock) 

460 if self._debug and sock.gettimeout() != 0: 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true

461 raise ValueError("the socket must be non-blocking") 

462 try: 

463 return sock.recvfrom(bufsize) 

464 except (BlockingIOError, InterruptedError): 

465 pass 

466 fut = self.create_future() 

467 fd = sock.fileno() 

468 self._ensure_fd_no_transport(fd) 

469 handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize) 

470 fut.add_done_callback( 

471 functools.partial(self._sock_read_done, fd, handle=handle)) 

472 return await fut 

473 

474 def _sock_recvfrom(self, fut, sock, bufsize): 

475 # _sock_recvfrom() can add itself as an I/O callback if the operation 

476 # can't be done immediately. Don't use it directly, call 

477 # sock_recvfrom(). 

478 if fut.done(): 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true

479 return 

480 try: 

481 result = sock.recvfrom(bufsize) 

482 except (BlockingIOError, InterruptedError): 

483 return # try again next time 

484 except (SystemExit, KeyboardInterrupt): 

485 raise 

486 except BaseException as exc: 

487 fut.set_exception(exc) 

488 else: 

489 fut.set_result(result) 

490 

491 async def sock_recvfrom_into(self, sock, buf, nbytes=0): 

492 """Receive data from the socket. 

493 

494 The received data is written into *buf* (a writable buffer). 

495 The return value is a tuple of (number of bytes written, address). 

496 """ 

497 base_events._check_ssl_socket(sock) 

498 if self._debug and sock.gettimeout() != 0: 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true

499 raise ValueError("the socket must be non-blocking") 

500 if not nbytes: 

501 nbytes = len(buf) 

502 

503 try: 

504 return sock.recvfrom_into(buf, nbytes) 

505 except (BlockingIOError, InterruptedError): 

506 pass 

507 fut = self.create_future() 

508 fd = sock.fileno() 

509 self._ensure_fd_no_transport(fd) 

510 handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf, 

511 nbytes) 

512 fut.add_done_callback( 

513 functools.partial(self._sock_read_done, fd, handle=handle)) 

514 return await fut 

515 

516 def _sock_recvfrom_into(self, fut, sock, buf, bufsize): 

517 # _sock_recv_into() can add itself as an I/O callback if the operation 

518 # can't be done immediately. Don't use it directly, call 

519 # sock_recv_into(). 

520 if fut.done(): 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true

521 return 

522 try: 

523 result = sock.recvfrom_into(buf, bufsize) 

524 except (BlockingIOError, InterruptedError): 

525 return # try again next time 

526 except (SystemExit, KeyboardInterrupt): 

527 raise 

528 except BaseException as exc: 

529 fut.set_exception(exc) 

530 else: 

531 fut.set_result(result) 

532 

533 async def sock_sendall(self, sock, data): 

534 """Send data to the socket. 

535 

536 The socket must be connected to a remote socket. This method 

537 continues to send data from data until either all data has been 

538 sent or an error occurs. None is returned on success. On error, 

539 an exception is raised, and there is no way to determine how much 

540 data, if any, was successfully processed by the receiving end of 

541 the connection. 

542 """ 

543 base_events._check_ssl_socket(sock) 

544 if self._debug and sock.gettimeout() != 0: 

545 raise ValueError("the socket must be non-blocking") 

546 try: 

547 n = sock.send(data) 

548 except (BlockingIOError, InterruptedError): 

549 n = 0 

550 

551 if n == len(data): 

552 # all data sent 

553 return 

554 

555 fut = self.create_future() 

556 fd = sock.fileno() 

557 self._ensure_fd_no_transport(fd) 

558 # use a trick with a list in closure to store a mutable state 

559 handle = self._add_writer(fd, self._sock_sendall, fut, sock, 

560 memoryview(data), [n]) 

561 fut.add_done_callback( 

562 functools.partial(self._sock_write_done, fd, handle=handle)) 

563 return await fut 

564 

565 def _sock_sendall(self, fut, sock, view, pos): 

566 if fut.done(): 566 ↛ 568line 566 didn't jump to line 568 because the condition on line 566 was never true

567 # Future cancellation can be scheduled on previous loop iteration 

568 return 

569 start = pos[0] 

570 try: 

571 n = sock.send(view[start:]) 

572 except (BlockingIOError, InterruptedError): 

573 return 

574 except (SystemExit, KeyboardInterrupt): 

575 raise 

576 except BaseException as exc: 

577 fut.set_exception(exc) 

578 return 

579 

580 start += n 

581 

582 if start == len(view): 

583 fut.set_result(None) 

584 else: 

585 pos[0] = start 

586 

587 async def sock_sendto(self, sock, data, address): 

588 """Send data to the socket. 

589 

590 The socket must be connected to a remote socket. This method 

591 continues to send data from data until either all data has been 

592 sent or an error occurs. None is returned on success. On error, 

593 an exception is raised, and there is no way to determine how much 

594 data, if any, was successfully processed by the receiving end of 

595 the connection. 

596 """ 

597 base_events._check_ssl_socket(sock) 

598 if self._debug and sock.gettimeout() != 0: 598 ↛ 599line 598 didn't jump to line 599 because the condition on line 598 was never true

599 raise ValueError("the socket must be non-blocking") 

600 try: 

601 return sock.sendto(data, address) 

602 except (BlockingIOError, InterruptedError): 

603 pass 

604 

605 fut = self.create_future() 

606 fd = sock.fileno() 

607 self._ensure_fd_no_transport(fd) 

608 # use a trick with a list in closure to store a mutable state 

609 handle = self._add_writer(fd, self._sock_sendto, fut, sock, data, 

610 address) 

611 fut.add_done_callback( 

612 functools.partial(self._sock_write_done, fd, handle=handle)) 

613 return await fut 

614 

615 def _sock_sendto(self, fut, sock, data, address): 

616 if fut.done(): 616 ↛ 618line 616 didn't jump to line 618 because the condition on line 616 was never true

617 # Future cancellation can be scheduled on previous loop iteration 

618 return 

619 try: 

620 n = sock.sendto(data, 0, address) 

621 except (BlockingIOError, InterruptedError): 

622 return 

623 except (SystemExit, KeyboardInterrupt): 

624 raise 

625 except BaseException as exc: 

626 fut.set_exception(exc) 

627 else: 

628 fut.set_result(n) 

629 

630 async def sock_connect(self, sock, address): 

631 """Connect to a remote socket at address. 

632 

633 This method is a coroutine. 

634 """ 

635 base_events._check_ssl_socket(sock) 

636 if self._debug and sock.gettimeout() != 0: 

637 raise ValueError("the socket must be non-blocking") 

638 

639 if sock.family == socket.AF_INET or ( 

640 base_events._HAS_IPv6 and sock.family == socket.AF_INET6): 

641 resolved = await self._ensure_resolved( 

642 address, family=sock.family, type=sock.type, proto=sock.proto, 

643 loop=self, 

644 ) 

645 _, _, _, _, address = resolved[0] 

646 

647 fut = self.create_future() 

648 self._sock_connect(fut, sock, address) 

649 try: 

650 return await fut 

651 finally: 

652 # Needed to break cycles when an exception occurs. 

653 fut = None 

654 

655 def _sock_connect(self, fut, sock, address): 

656 fd = sock.fileno() 

657 try: 

658 sock.connect(address) 

659 except (BlockingIOError, InterruptedError): 

660 # Issue #23618: When the C function connect() fails with EINTR, the 

661 # connection runs in background. We have to wait until the socket 

662 # becomes writable to be notified when the connection succeed or 

663 # fails. 

664 self._ensure_fd_no_transport(fd) 

665 handle = self._add_writer( 

666 fd, self._sock_connect_cb, fut, sock, address) 

667 fut.add_done_callback( 

668 functools.partial(self._sock_write_done, fd, handle=handle)) 

669 except (SystemExit, KeyboardInterrupt): 

670 raise 

671 except BaseException as exc: 

672 fut.set_exception(exc) 

673 else: 

674 fut.set_result(None) 

675 finally: 

676 fut = None 

677 

678 def _sock_write_done(self, fd, fut, handle=None): 

679 if handle is None or not handle.cancelled(): 

680 self.remove_writer(fd) 

681 

682 def _sock_connect_cb(self, fut, sock, address): 

683 if fut.done(): 683 ↛ 684line 683 didn't jump to line 684 because the condition on line 683 was never true

684 return 

685 

686 try: 

687 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 

688 if err != 0: 

689 # Jump to any except clause below. 

690 raise OSError(err, f'Connect call failed {address}') 

691 except (BlockingIOError, InterruptedError): 

692 # socket is still registered, the callback will be retried later 

693 pass 

694 except (SystemExit, KeyboardInterrupt): 

695 raise 

696 except BaseException as exc: 

697 fut.set_exception(exc) 

698 else: 

699 fut.set_result(None) 

700 finally: 

701 fut = None 

702 

703 async def sock_accept(self, sock): 

704 """Accept a connection. 

705 

706 The socket must be bound to an address and listening for 

707 connections. The return value is a pair (conn, address) where 

708 conn is a new socket object usable to send and receive data on the 

709 connection, and address is the address bound to the socket on the 

710 other end of the connection. 

711 """ 

712 base_events._check_ssl_socket(sock) 

713 if self._debug and sock.gettimeout() != 0: 

714 raise ValueError("the socket must be non-blocking") 

715 fut = self.create_future() 

716 self._sock_accept(fut, sock) 

717 return await fut 

718 

719 def _sock_accept(self, fut, sock): 

720 fd = sock.fileno() 

721 try: 

722 conn, address = sock.accept() 

723 conn.setblocking(False) 

724 except (BlockingIOError, InterruptedError): 

725 self._ensure_fd_no_transport(fd) 

726 handle = self._add_reader(fd, self._sock_accept, fut, sock) 

727 fut.add_done_callback( 

728 functools.partial(self._sock_read_done, fd, handle=handle)) 

729 except (SystemExit, KeyboardInterrupt): 

730 raise 

731 except BaseException as exc: 

732 fut.set_exception(exc) 

733 else: 

734 fut.set_result((conn, address)) 

735 

736 async def _sendfile_native(self, transp, file, offset, count): 

737 del self._transports[transp._sock_fd] 

738 resume_reading = transp.is_reading() 

739 transp.pause_reading() 

740 await transp._make_empty_waiter() 

741 try: 

742 return await self.sock_sendfile(transp._sock, file, offset, count, 

743 fallback=False) 

744 finally: 

745 transp._reset_empty_waiter() 

746 if resume_reading: 746 ↛ 748line 746 didn't jump to line 748 because the condition on line 746 was always true

747 transp.resume_reading() 

748 self._transports[transp._sock_fd] = transp 

749 

750 def _process_events(self, event_list): 

751 for key, mask in event_list: 

752 fileobj, (reader, writer) = key.fileobj, key.data 

753 if mask & selectors.EVENT_READ and reader is not None: 

754 if reader._cancelled: 

755 self._remove_reader(fileobj) 

756 else: 

757 self._add_callback(reader) 

758 if mask & selectors.EVENT_WRITE and writer is not None: 

759 if writer._cancelled: 

760 self._remove_writer(fileobj) 

761 else: 

762 self._add_callback(writer) 

763 

764 def _stop_serving(self, sock): 

765 self._remove_reader(sock.fileno()) 

766 sock.close() 

767 

768 

769class _SelectorTransport(transports._FlowControlMixin, 

770 transports.Transport): 

771 

772 max_size = 256 * 1024 # Buffer size passed to recv(). 

773 

774 # Attribute used in the destructor: it must be set even if the constructor 

775 # is not called (see _SelectorSslTransport which may start by raising an 

776 # exception) 

777 _sock = None 

778 

779 def __init__(self, loop, sock, protocol, extra=None, server=None, context=None): 

780 super().__init__(extra, loop) 

781 self._extra['socket'] = trsock.TransportSocket(sock) 

782 try: 

783 self._extra['sockname'] = sock.getsockname() 

784 except OSError: 

785 self._extra['sockname'] = None 

786 if 'peername' not in self._extra: 

787 try: 

788 self._extra['peername'] = sock.getpeername() 

789 except socket.error: 

790 self._extra['peername'] = None 

791 self._sock = sock 

792 self._sock_fd = sock.fileno() 

793 self._context = context 

794 self._protocol_connected = False 

795 self.set_protocol(protocol) 

796 

797 self._server = server 

798 self._buffer = collections.deque() 

799 self._buffer_size = 0 

800 self._conn_lost = 0 # Set when call to connection_lost scheduled. 

801 self._closing = False # Set when close() called. 

802 self._paused = False # Set when pause_reading() called 

803 

804 if self._server is not None: 

805 self._server._attach(self) 

806 loop._transports[self._sock_fd] = self 

807 

808 def __repr__(self): 

809 info = [self.__class__.__name__] 

810 if self._sock is None: 810 ↛ 811line 810 didn't jump to line 811 because the condition on line 810 was never true

811 info.append('closed') 

812 elif self._closing: 

813 info.append('closing') 

814 info.append(f'fd={self._sock_fd}') 

815 # test if the transport was closed 

816 if self._loop is not None and not self._loop.is_closed(): 

817 polling = _test_selector_event(self._loop._selector, 

818 self._sock_fd, selectors.EVENT_READ) 

819 if polling: 

820 info.append('read=polling') 

821 else: 

822 info.append('read=idle') 

823 

824 polling = _test_selector_event(self._loop._selector, 

825 self._sock_fd, 

826 selectors.EVENT_WRITE) 

827 if polling: 827 ↛ 828line 827 didn't jump to line 828 because the condition on line 827 was never true

828 state = 'polling' 

829 else: 

830 state = 'idle' 

831 

832 bufsize = self.get_write_buffer_size() 

833 info.append(f'write=<{state}, bufsize={bufsize}>') 

834 return '<{}>'.format(' '.join(info)) 

835 

836 def abort(self): 

837 self._force_close(None) 

838 

839 def set_protocol(self, protocol): 

840 self._protocol = protocol 

841 self._protocol_connected = True 

842 

843 def get_protocol(self): 

844 return self._protocol 

845 

846 def is_closing(self): 

847 return self._closing 

848 

849 def is_reading(self): 

850 return not self.is_closing() and not self._paused 

851 

852 def pause_reading(self): 

853 if not self.is_reading(): 

854 return 

855 self._paused = True 

856 self._loop._remove_reader(self._sock_fd) 

857 if self._loop.get_debug(): 

858 logger.debug("%r pauses reading", self) 

859 

860 def resume_reading(self): 

861 if self._closing or not self._paused: 

862 return 

863 self._paused = False 

864 self._add_reader(self._sock_fd, self._read_ready) 

865 if self._loop.get_debug(): 865 ↛ 866line 865 didn't jump to line 866 because the condition on line 865 was never true

866 logger.debug("%r resumes reading", self) 

867 

868 def close(self): 

869 if self._closing: 

870 return 

871 self._closing = True 

872 self._loop._remove_reader(self._sock_fd) 

873 if not self._buffer: 

874 self._conn_lost += 1 

875 self._loop._remove_writer(self._sock_fd) 

876 self._call_soon(self._call_connection_lost, None) 

877 

878 def __del__(self, _warn=warnings.warn): 

879 if self._sock is not None: 879 ↛ 880line 879 didn't jump to line 880 because the condition on line 879 was never true

880 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 

881 self._sock.close() 

882 if self._server is not None: 

883 self._server._detach(self) 

884 

885 def _fatal_error(self, exc, message='Fatal error on transport'): 

886 # Should be called from exception handler only. 

887 if isinstance(exc, OSError): 

888 if self._loop.get_debug(): 888 ↛ 889line 888 didn't jump to line 889 because the condition on line 888 was never true

889 logger.debug("%r: %s", self, message, exc_info=True) 

890 else: 

891 self._loop.call_exception_handler({ 

892 'message': message, 

893 'exception': exc, 

894 'transport': self, 

895 'protocol': self._protocol, 

896 }) 

897 self._force_close(exc) 

898 

899 def _force_close(self, exc): 

900 if self._conn_lost: 

901 return 

902 if self._buffer: 

903 self._buffer.clear() 

904 self._buffer_size = 0 

905 self._loop._remove_writer(self._sock_fd) 

906 if not self._closing: 

907 self._closing = True 

908 self._loop._remove_reader(self._sock_fd) 

909 self._conn_lost += 1 

910 self._call_soon(self._call_connection_lost, exc) 

911 

912 def _call_connection_lost(self, exc): 

913 try: 

914 if self._protocol_connected: 914 ↛ 917line 914 didn't jump to line 917 because the condition on line 914 was always true

915 self._protocol.connection_lost(exc) 

916 finally: 

917 self._sock.close() 

918 self._sock = None 

919 self._protocol = None 

920 self._loop = None 

921 server = self._server 

922 if server is not None: 

923 server._detach(self) 

924 self._server = None 

925 

926 def get_write_buffer_size(self): 

927 return self._buffer_size 

928 

929 def _add_reader(self, fd, callback, *args): 

930 if not self.is_reading(): 

931 return 

932 self._loop._add_reader(fd, callback, *args, context=self._context) 

933 

934 def _add_writer(self, fd, callback, *args): 

935 self._loop._add_writer(fd, callback, *args, context=self._context) 

936 

937 def _call_soon(self, callback, *args): 

938 self._loop.call_soon(callback, *args, context=self._context) 

939 

940class _SelectorSocketTransport(_SelectorTransport): 

941 

942 _start_tls_compatible = True 

943 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE 

944 

945 def __init__(self, loop, sock, protocol, waiter=None, 

946 extra=None, server=None, context=None): 

947 self._read_ready_cb = None 

948 super().__init__(loop, sock, protocol, extra, server, context) 

949 self._eof = False 

950 self._empty_waiter = None 

951 if _HAS_SENDMSG: 951 ↛ 954line 951 didn't jump to line 954 because the condition on line 951 was always true

952 self._write_ready = self._write_sendmsg 

953 else: 

954 self._write_ready = self._write_send 

955 # Disable the Nagle algorithm -- small writes will be 

956 # sent without waiting for the TCP ACK. This generally 

957 # decreases the latency (in some cases significantly.) 

958 base_events._set_nodelay(self._sock) 

959 

960 self._call_soon(self._protocol.connection_made, self) 

961 # only start reading when connection_made() has been called 

962 self._call_soon(self._add_reader, self._sock_fd, self._read_ready) 

963 if waiter is not None: 

964 # only wake up the waiter when connection_made() has been called 

965 self._call_soon(futures._set_result_unless_cancelled, waiter, None) 

966 

967 def set_protocol(self, protocol): 

968 if isinstance(protocol, protocols.BufferedProtocol): 

969 self._read_ready_cb = self._read_ready__get_buffer 

970 else: 

971 self._read_ready_cb = self._read_ready__data_received 

972 

973 super().set_protocol(protocol) 

974 

975 def _read_ready(self): 

976 self._read_ready_cb() 

977 

978 def _read_ready__get_buffer(self): 

979 if self._conn_lost: 979 ↛ 980line 979 didn't jump to line 980 because the condition on line 979 was never true

980 return 

981 

982 try: 

983 buf = self._protocol.get_buffer(-1) 

984 if not len(buf): 

985 raise RuntimeError('get_buffer() returned an empty buffer') 

986 except (SystemExit, KeyboardInterrupt): 

987 raise 

988 except BaseException as exc: 

989 self._fatal_error( 

990 exc, 'Fatal error: protocol.get_buffer() call failed.') 

991 return 

992 

993 try: 

994 nbytes = self._sock.recv_into(buf) 

995 except (BlockingIOError, InterruptedError): 

996 return 

997 except (SystemExit, KeyboardInterrupt): 

998 raise 

999 except BaseException as exc: 

1000 self._fatal_error(exc, 'Fatal read error on socket transport') 

1001 return 

1002 

1003 if not nbytes: 

1004 self._read_ready__on_eof() 

1005 return 

1006 

1007 try: 

1008 self._protocol.buffer_updated(nbytes) 

1009 except (SystemExit, KeyboardInterrupt): 

1010 raise 

1011 except BaseException as exc: 

1012 self._fatal_error( 

1013 exc, 'Fatal error: protocol.buffer_updated() call failed.') 

1014 

1015 def _read_ready__data_received(self): 

1016 if self._conn_lost: 1016 ↛ 1017line 1016 didn't jump to line 1017 because the condition on line 1016 was never true

1017 return 

1018 try: 

1019 data = self._sock.recv(self.max_size) 

1020 except (BlockingIOError, InterruptedError): 

1021 return 

1022 except (SystemExit, KeyboardInterrupt): 

1023 raise 

1024 except BaseException as exc: 

1025 self._fatal_error(exc, 'Fatal read error on socket transport') 

1026 return 

1027 

1028 if not data: 

1029 self._read_ready__on_eof() 

1030 return 

1031 

1032 try: 

1033 self._protocol.data_received(data) 

1034 except (SystemExit, KeyboardInterrupt): 

1035 raise 

1036 except BaseException as exc: 

1037 self._fatal_error( 

1038 exc, 'Fatal error: protocol.data_received() call failed.') 

1039 

1040 def _read_ready__on_eof(self): 

1041 if self._loop.get_debug(): 

1042 logger.debug("%r received EOF", self) 

1043 

1044 try: 

1045 keep_open = self._protocol.eof_received() 

1046 except (SystemExit, KeyboardInterrupt): 

1047 raise 

1048 except BaseException as exc: 

1049 self._fatal_error( 

1050 exc, 'Fatal error: protocol.eof_received() call failed.') 

1051 return 

1052 

1053 if keep_open: 

1054 # We're keeping the connection open so the 

1055 # protocol can write more, but we still can't 

1056 # receive more, so remove the reader callback. 

1057 self._loop._remove_reader(self._sock_fd) 

1058 else: 

1059 self.close() 

1060 

1061 def write(self, data): 

1062 if not isinstance(data, (bytes, bytearray, memoryview)): 

1063 raise TypeError(f'data argument must be a bytes, bytearray, or memoryview ' 

1064 f'object, not {type(data).__name__!r}') 

1065 if self._eof: 1065 ↛ 1066line 1065 didn't jump to line 1066 because the condition on line 1065 was never true

1066 raise RuntimeError('Cannot call write() after write_eof()') 

1067 if self._empty_waiter is not None: 

1068 raise RuntimeError('unable to write; sendfile is in progress') 

1069 if not data: 

1070 return 

1071 

1072 if self._conn_lost: 

1073 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 

1074 logger.warning('socket.send() raised exception.') 

1075 self._conn_lost += 1 

1076 return 

1077 

1078 if not self._buffer: 

1079 # Optimization: try to send now. 

1080 try: 

1081 n = self._sock.send(data) 

1082 except (BlockingIOError, InterruptedError): 

1083 pass 

1084 except (SystemExit, KeyboardInterrupt): 

1085 raise 

1086 except BaseException as exc: 

1087 self._fatal_error(exc, 'Fatal write error on socket transport') 

1088 return 

1089 else: 

1090 data = memoryview(data)[n:] 

1091 if not data: 

1092 return 

1093 # Not all was written; register write handler. 

1094 self._add_writer(self._sock_fd, self._write_ready) 

1095 

1096 # Add it to the buffer. 

1097 self._buffer.append(data) 

1098 self._buffer_size += len(data) 

1099 self._maybe_pause_protocol() 

1100 

1101 def _get_sendmsg_buffer(self): 

1102 return itertools.islice(self._buffer, SC_IOV_MAX) 

1103 

1104 def _write_sendmsg(self): 

1105 assert self._buffer, 'Data should not be empty' 

1106 if self._conn_lost: 1106 ↛ 1107line 1106 didn't jump to line 1107 because the condition on line 1106 was never true

1107 return 

1108 try: 

1109 nbytes = self._sock.sendmsg(self._get_sendmsg_buffer()) 

1110 self._adjust_leftover_buffer(nbytes) 

1111 except (BlockingIOError, InterruptedError): 

1112 pass 

1113 except (SystemExit, KeyboardInterrupt): 

1114 raise 

1115 except BaseException as exc: 

1116 self._loop._remove_writer(self._sock_fd) 

1117 self._buffer.clear() 

1118 self._buffer_size = 0 

1119 self._fatal_error(exc, 'Fatal write error on socket transport') 

1120 if self._empty_waiter is not None: 1120 ↛ 1121line 1120 didn't jump to line 1121 because the condition on line 1120 was never true

1121 self._empty_waiter.set_exception(exc) 

1122 else: 

1123 self._maybe_resume_protocol() # May append to buffer. 

1124 if not self._buffer: 

1125 self._loop._remove_writer(self._sock_fd) 

1126 if self._empty_waiter is not None: 

1127 self._empty_waiter.set_result(None) 

1128 if self._closing: 

1129 self._call_connection_lost(None) 

1130 elif self._eof: 1130 ↛ 1131line 1130 didn't jump to line 1131 because the condition on line 1130 was never true

1131 self._sock.shutdown(socket.SHUT_WR) 

1132 

1133 def _adjust_leftover_buffer(self, nbytes: int) -> None: 

1134 self._buffer_size -= nbytes 

1135 buffer = self._buffer 

1136 while nbytes: 

1137 b = buffer.popleft() 

1138 b_len = len(b) 

1139 if b_len <= nbytes: 

1140 nbytes -= b_len 

1141 else: 

1142 buffer.appendleft(b[nbytes:]) 

1143 break 

1144 

1145 def _write_send(self): 

1146 assert self._buffer, 'Data should not be empty' 

1147 if self._conn_lost: 1147 ↛ 1148line 1147 didn't jump to line 1148 because the condition on line 1147 was never true

1148 return 

1149 try: 

1150 buffer = self._buffer.popleft() 

1151 n = self._sock.send(buffer) 

1152 if n != len(buffer): 

1153 # Not all data was written 

1154 self._buffer.appendleft(buffer[n:]) 

1155 self._buffer_size -= n 

1156 except (BlockingIOError, InterruptedError): 

1157 self._buffer.appendleft(buffer) 

1158 return 

1159 except (SystemExit, KeyboardInterrupt): 

1160 raise 

1161 except BaseException as exc: 

1162 self._loop._remove_writer(self._sock_fd) 

1163 self._buffer.clear() 

1164 self._buffer_size = 0 

1165 self._fatal_error(exc, 'Fatal write error on socket transport') 

1166 if self._empty_waiter is not None: 1166 ↛ 1167line 1166 didn't jump to line 1167 because the condition on line 1166 was never true

1167 self._empty_waiter.set_exception(exc) 

1168 else: 

1169 self._maybe_resume_protocol() # May append to buffer. 

1170 if not self._buffer: 

1171 self._loop._remove_writer(self._sock_fd) 

1172 if self._empty_waiter is not None: 1172 ↛ 1173line 1172 didn't jump to line 1173 because the condition on line 1172 was never true

1173 self._empty_waiter.set_result(None) 

1174 if self._closing: 

1175 self._call_connection_lost(None) 

1176 elif self._eof: 

1177 self._sock.shutdown(socket.SHUT_WR) 

1178 

1179 def write_eof(self): 

1180 if self._closing or self._eof: 

1181 return 

1182 self._eof = True 

1183 if not self._buffer: 

1184 self._sock.shutdown(socket.SHUT_WR) 

1185 

1186 def writelines(self, list_of_data): 

1187 if self._eof: 1187 ↛ 1188line 1187 didn't jump to line 1188 because the condition on line 1187 was never true

1188 raise RuntimeError('Cannot call writelines() after write_eof()') 

1189 if self._empty_waiter is not None: 1189 ↛ 1190line 1189 didn't jump to line 1190 because the condition on line 1189 was never true

1190 raise RuntimeError('unable to writelines; sendfile is in progress') 

1191 if not list_of_data: 1191 ↛ 1192line 1191 didn't jump to line 1192 because the condition on line 1191 was never true

1192 return 

1193 

1194 if self._conn_lost: 

1195 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 1195 ↛ 1196line 1195 didn't jump to line 1196 because the condition on line 1195 was never true

1196 logger.warning('socket.send() raised exception.') 

1197 self._conn_lost += 1 

1198 return 

1199 

1200 for data in list_of_data: 

1201 self._buffer.append(memoryview(data)) 

1202 self._buffer_size += len(data) 

1203 self._write_ready() 

1204 # If the entire buffer couldn't be written, register a write handler 

1205 if self._buffer: 

1206 self._add_writer(self._sock_fd, self._write_ready) 

1207 self._maybe_pause_protocol() 

1208 

1209 def can_write_eof(self): 

1210 return True 

1211 

1212 def _call_connection_lost(self, exc): 

1213 try: 

1214 super()._call_connection_lost(exc) 

1215 finally: 

1216 self._write_ready = None 

1217 if self._empty_waiter is not None: 1217 ↛ 1218line 1217 didn't jump to line 1218 because the condition on line 1217 was never true

1218 self._empty_waiter.set_exception( 

1219 ConnectionError("Connection is closed by peer")) 

1220 

1221 def _make_empty_waiter(self): 

1222 if self._empty_waiter is not None: 1222 ↛ 1223line 1222 didn't jump to line 1223 because the condition on line 1222 was never true

1223 raise RuntimeError("Empty waiter is already set") 

1224 self._empty_waiter = self._loop.create_future() 

1225 if not self._buffer: 

1226 self._empty_waiter.set_result(None) 

1227 return self._empty_waiter 

1228 

1229 def _reset_empty_waiter(self): 

1230 self._empty_waiter = None 

1231 

1232 def close(self): 

1233 self._read_ready_cb = None 

1234 super().close() 

1235 

1236 

1237class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport): 

1238 

1239 _header_size = 8 

1240 

1241 def __init__(self, loop, sock, protocol, address=None, 

1242 waiter=None, extra=None): 

1243 super().__init__(loop, sock, protocol, extra) 

1244 self._address = address 

1245 self._buffer_size = 0 

1246 self._call_soon(self._protocol.connection_made, self) 

1247 # only start reading when connection_made() has been called 

1248 self._call_soon(self._add_reader, self._sock_fd, self._read_ready) 

1249 if waiter is not None: 

1250 # only wake up the waiter when connection_made() has been called 

1251 self._call_soon(futures._set_result_unless_cancelled, waiter, None) 

1252 

1253 def get_write_buffer_size(self): 

1254 return self._buffer_size 

1255 

1256 def _read_ready(self): 

1257 if self._conn_lost: 1257 ↛ 1258line 1257 didn't jump to line 1258 because the condition on line 1257 was never true

1258 return 

1259 try: 

1260 data, addr = self._sock.recvfrom(self.max_size) 

1261 except (BlockingIOError, InterruptedError): 

1262 pass 

1263 except OSError as exc: 

1264 self._protocol.error_received(exc) 

1265 except (SystemExit, KeyboardInterrupt): 

1266 raise 

1267 except BaseException as exc: 

1268 self._fatal_error(exc, 'Fatal read error on datagram transport') 

1269 else: 

1270 self._protocol.datagram_received(data, addr) 

1271 

1272 def sendto(self, data, addr=None): 

1273 if not isinstance(data, (bytes, bytearray, memoryview)): 

1274 raise TypeError(f'data argument must be a bytes-like object, ' 

1275 f'not {type(data).__name__!r}') 

1276 

1277 if self._address: 

1278 if addr not in (None, self._address): 

1279 raise ValueError( 

1280 f'Invalid address: must be None or {self._address}') 

1281 addr = self._address 

1282 

1283 if self._conn_lost and self._address: 

1284 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 

1285 logger.warning('socket.send() raised exception.') 

1286 self._conn_lost += 1 

1287 return 

1288 

1289 if not self._buffer: 

1290 # Attempt to send it right away first. 

1291 try: 

1292 if self._extra['peername']: 

1293 self._sock.send(data) 

1294 else: 

1295 self._sock.sendto(data, addr) 

1296 return 

1297 except (BlockingIOError, InterruptedError): 

1298 self._add_writer(self._sock_fd, self._sendto_ready) 

1299 except OSError as exc: 

1300 self._protocol.error_received(exc) 

1301 return 

1302 except (SystemExit, KeyboardInterrupt): 

1303 raise 

1304 except BaseException as exc: 

1305 self._fatal_error( 

1306 exc, 'Fatal write error on datagram transport') 

1307 return 

1308 

1309 # Ensure that what we buffer is immutable. 

1310 self._buffer.append((bytes(data), addr)) 

1311 self._buffer_size += len(data) + self._header_size 

1312 self._maybe_pause_protocol() 

1313 

1314 def _sendto_ready(self): 

1315 while self._buffer: 

1316 data, addr = self._buffer.popleft() 

1317 self._buffer_size -= len(data) + self._header_size 

1318 try: 

1319 if self._extra['peername']: 

1320 self._sock.send(data) 

1321 else: 

1322 self._sock.sendto(data, addr) 

1323 except (BlockingIOError, InterruptedError): 

1324 self._buffer.appendleft((data, addr)) # Try again later. 

1325 self._buffer_size += len(data) + self._header_size 

1326 break 

1327 except OSError as exc: 

1328 self._protocol.error_received(exc) 

1329 return 

1330 except (SystemExit, KeyboardInterrupt): 

1331 raise 

1332 except BaseException as exc: 

1333 self._fatal_error( 

1334 exc, 'Fatal write error on datagram transport') 

1335 return 

1336 

1337 self._maybe_resume_protocol() # May append to buffer. 

1338 if not self._buffer: 

1339 self._loop._remove_writer(self._sock_fd) 

1340 if self._closing: 

1341 self._call_connection_lost(None)