Coverage for Lib/asyncio/selector_events.py: 87%

900 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-15 02:02 +0000

1"""Event loop using a selector and related classes. 

2 

3A selector is a "notify-when-ready" multiplexer. For a subclass which 

4also includes support for signal handling, see the unix_events sub-module. 

5""" 

6 

7__all__ = 'BaseSelectorEventLoop', 

8 

9import collections 

10import errno 

11import functools 

12import itertools 

13import os 

14import selectors 

15import socket 

16import warnings 

17import weakref 

18try: 

19 import ssl 

20except ImportError: # pragma: no cover 

21 ssl = None 

22 

23from . import base_events 

24from . import constants 

25from . import events 

26from . import futures 

27from . import protocols 

28from . import sslproto 

29from . import transports 

30from . import trsock 

31from .log import logger 

32 

33_HAS_SENDMSG = hasattr(socket.socket, 'sendmsg') 

34 

35if _HAS_SENDMSG: 35 ↛ 42line 35 didn't jump to line 42 because the condition on line 35 was always true

36 try: 

37 SC_IOV_MAX = os.sysconf('SC_IOV_MAX') 

38 except OSError: 

39 # Fallback to send 

40 _HAS_SENDMSG = False 

41 

42def _test_selector_event(selector, fd, event): 

43 # Test if the selector is monitoring 'event' events 

44 # for the file descriptor 'fd'. 

45 try: 

46 key = selector.get_key(fd) 

47 except KeyError: 

48 return False 

49 else: 

50 return bool(key.events & event) 

51 

52 

53class BaseSelectorEventLoop(base_events.BaseEventLoop): 

54 """Selector event loop. 

55 

56 See events.EventLoop for API specification. 

57 """ 

58 

59 def __init__(self, selector=None): 

60 super().__init__() 

61 

62 if selector is None: 

63 selector = selectors.DefaultSelector() 

64 logger.debug('Using selector: %s', selector.__class__.__name__) 

65 self._selector = selector 

66 self._make_self_pipe() 

67 self._transports = weakref.WeakValueDictionary() 

68 

69 def _make_socket_transport(self, sock, protocol, waiter=None, *, 

70 extra=None, server=None): 

71 self._ensure_fd_no_transport(sock) 

72 return _SelectorSocketTransport(self, sock, protocol, waiter, 

73 extra, server) 

74 

75 def _make_ssl_transport( 

76 self, rawsock, protocol, sslcontext, waiter=None, 

77 *, server_side=False, server_hostname=None, 

78 extra=None, server=None, 

79 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 

80 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT, 

81 ): 

82 self._ensure_fd_no_transport(rawsock) 

83 ssl_protocol = sslproto.SSLProtocol( 

84 self, protocol, sslcontext, waiter, 

85 server_side, server_hostname, 

86 ssl_handshake_timeout=ssl_handshake_timeout, 

87 ssl_shutdown_timeout=ssl_shutdown_timeout 

88 ) 

89 _SelectorSocketTransport(self, rawsock, ssl_protocol, 

90 extra=extra, server=server) 

91 return ssl_protocol._app_transport 

92 

93 def _make_datagram_transport(self, sock, protocol, 

94 address=None, waiter=None, extra=None): 

95 self._ensure_fd_no_transport(sock) 

96 return _SelectorDatagramTransport(self, sock, protocol, 

97 address, waiter, extra) 

98 

99 def close(self): 

100 if self.is_running(): 

101 raise RuntimeError("Cannot close a running event loop") 

102 if self.is_closed(): 

103 return 

104 self._close_self_pipe() 

105 super().close() 

106 if self._selector is not None: 

107 self._selector.close() 

108 self._selector = None 

109 

110 def _close_self_pipe(self): 

111 self._remove_reader(self._ssock.fileno()) 

112 self._ssock.close() 

113 self._ssock = None 

114 self._csock.close() 

115 self._csock = None 

116 self._internal_fds -= 1 

117 

118 def _make_self_pipe(self): 

119 # A self-socket, really. :-) 

120 self._ssock, self._csock = socket.socketpair() 

121 self._ssock.setblocking(False) 

122 self._csock.setblocking(False) 

123 self._internal_fds += 1 

124 self._add_reader(self._ssock.fileno(), self._read_from_self) 

125 

126 def _process_self_data(self, data): 

127 pass 

128 

129 def _read_from_self(self): 

130 while True: 

131 try: 

132 data = self._ssock.recv(4096) 

133 if not data: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true

134 break 

135 self._process_self_data(data) 

136 except InterruptedError: 

137 continue 

138 except BlockingIOError: 

139 break 

140 

141 def _write_to_self(self): 

142 # This may be called from a different thread, possibly after 

143 # _close_self_pipe() has been called or even while it is 

144 # running. Guard for self._csock being None or closed. When 

145 # a socket is closed, send() raises OSError (with errno set to 

146 # EBADF, but let's not rely on the exact error code). 

147 csock = self._csock 

148 if csock is None: 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true

149 return 

150 

151 try: 

152 csock.send(b'\0') 

153 except OSError: 

154 if self._debug: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true

155 logger.debug("Fail to write a null byte into the " 

156 "self-pipe socket", 

157 exc_info=True) 

158 

159 def _start_serving(self, protocol_factory, sock, 

160 sslcontext=None, server=None, backlog=100, 

161 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 

162 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT): 

163 self._add_reader(sock.fileno(), self._accept_connection, 

164 protocol_factory, sock, sslcontext, server, backlog, 

165 ssl_handshake_timeout, ssl_shutdown_timeout) 

166 

167 def _accept_connection( 

168 self, protocol_factory, sock, 

169 sslcontext=None, server=None, backlog=100, 

170 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 

171 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT): 

172 # This method is only called once for each event loop tick where the 

173 # listening socket has triggered an EVENT_READ. There may be multiple 

174 # connections waiting for an .accept() so it is called in a loop. 

175 # See https://bugs.python.org/issue27906 for more details. 

176 for _ in range(backlog): 

177 try: 

178 conn, addr = sock.accept() 

179 if self._debug: 

180 logger.debug("%r got a new connection from %r: %r", 

181 server, addr, conn) 

182 conn.setblocking(False) 

183 except ConnectionAbortedError: 

184 # Discard connections that were aborted before accept(). 

185 continue 

186 except (BlockingIOError, InterruptedError): 

187 # Early exit because of a signal or 

188 # the socket accept buffer is empty. 

189 return 

190 except OSError as exc: 

191 # There's nowhere to send the error, so just log it. 

192 if exc.errno in (errno.EMFILE, errno.ENFILE, 192 ↛ 209line 192 didn't jump to line 209 because the condition on line 192 was always true

193 errno.ENOBUFS, errno.ENOMEM): 

194 # Some platforms (e.g. Linux keep reporting the FD as 

195 # ready, so we remove the read handler temporarily. 

196 # We'll try again in a while. 

197 self.call_exception_handler({ 

198 'message': 'socket.accept() out of system resource', 

199 'exception': exc, 

200 'socket': trsock.TransportSocket(sock), 

201 }) 

202 self._remove_reader(sock.fileno()) 

203 self.call_later(constants.ACCEPT_RETRY_DELAY, 

204 self._start_serving, 

205 protocol_factory, sock, sslcontext, server, 

206 backlog, ssl_handshake_timeout, 

207 ssl_shutdown_timeout) 

208 else: 

209 raise # The event loop will catch, log and ignore it. 

210 else: 

211 extra = {'peername': addr} 

212 accept = self._accept_connection2( 

213 protocol_factory, conn, extra, sslcontext, server, 

214 ssl_handshake_timeout, ssl_shutdown_timeout) 

215 self.create_task(accept) 

216 

217 async def _accept_connection2( 

218 self, protocol_factory, conn, extra, 

219 sslcontext=None, server=None, 

220 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 

221 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT): 

222 protocol = None 

223 transport = None 

224 try: 

225 protocol = protocol_factory() 

226 waiter = self.create_future() 

227 if sslcontext: 

228 transport = self._make_ssl_transport( 

229 conn, protocol, sslcontext, waiter=waiter, 

230 server_side=True, extra=extra, server=server, 

231 ssl_handshake_timeout=ssl_handshake_timeout, 

232 ssl_shutdown_timeout=ssl_shutdown_timeout) 

233 else: 

234 transport = self._make_socket_transport( 

235 conn, protocol, waiter=waiter, extra=extra, 

236 server=server) 

237 

238 try: 

239 await waiter 

240 except BaseException: 

241 transport.close() 

242 # gh-109534: When an exception is raised by the SSLProtocol object the 

243 # exception set in this future can keep the protocol object alive and 

244 # cause a reference cycle. 

245 waiter = None 

246 raise 

247 # It's now up to the protocol to handle the connection. 

248 

249 except (SystemExit, KeyboardInterrupt): 

250 raise 

251 except BaseException as exc: 

252 if self._debug: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true

253 context = { 

254 'message': 

255 'Error on transport creation for incoming connection', 

256 'exception': exc, 

257 } 

258 if protocol is not None: 

259 context['protocol'] = protocol 

260 if transport is not None: 

261 context['transport'] = transport 

262 self.call_exception_handler(context) 

263 

264 def _ensure_fd_no_transport(self, fd): 

265 fileno = fd 

266 if not isinstance(fileno, int): 

267 try: 

268 fileno = int(fileno.fileno()) 

269 except (AttributeError, TypeError, ValueError): 

270 # This code matches selectors._fileobj_to_fd function. 

271 raise ValueError(f"Invalid file object: {fd!r}") from None 

272 transport = self._transports.get(fileno) 

273 if transport and not transport.is_closing(): 

274 raise RuntimeError( 

275 f'File descriptor {fd!r} is used by transport ' 

276 f'{transport!r}') 

277 

278 def _add_reader(self, fd, callback, *args): 

279 self._check_closed() 

280 handle = events.Handle(callback, args, self, None) 

281 key = self._selector.get_map().get(fd) 

282 if key is None: 

283 self._selector.register(fd, selectors.EVENT_READ, 

284 (handle, None)) 

285 else: 

286 mask, (reader, writer) = key.events, key.data 

287 self._selector.modify(fd, mask | selectors.EVENT_READ, 

288 (handle, writer)) 

289 if reader is not None: 

290 reader.cancel() 

291 return handle 

292 

293 def _remove_reader(self, fd): 

294 if self.is_closed(): 

295 return False 

296 key = self._selector.get_map().get(fd) 

297 if key is None: 

298 return False 

299 mask, (reader, writer) = key.events, key.data 

300 mask &= ~selectors.EVENT_READ 

301 if not mask: 

302 self._selector.unregister(fd) 

303 else: 

304 self._selector.modify(fd, mask, (None, writer)) 

305 

306 if reader is not None: 

307 reader.cancel() 

308 return True 

309 else: 

310 return False 

311 

312 def _add_writer(self, fd, callback, *args): 

313 self._check_closed() 

314 handle = events.Handle(callback, args, self, None) 

315 key = self._selector.get_map().get(fd) 

316 if key is None: 

317 self._selector.register(fd, selectors.EVENT_WRITE, 

318 (None, handle)) 

319 else: 

320 mask, (reader, writer) = key.events, key.data 

321 self._selector.modify(fd, mask | selectors.EVENT_WRITE, 

322 (reader, handle)) 

323 if writer is not None: 

324 writer.cancel() 

325 return handle 

326 

327 def _remove_writer(self, fd): 

328 """Remove a writer callback.""" 

329 if self.is_closed(): 

330 return False 

331 key = self._selector.get_map().get(fd) 

332 if key is None: 

333 return False 

334 mask, (reader, writer) = key.events, key.data 

335 # Remove both writer and connector. 

336 mask &= ~selectors.EVENT_WRITE 

337 if not mask: 

338 self._selector.unregister(fd) 

339 else: 

340 self._selector.modify(fd, mask, (reader, None)) 

341 

342 if writer is not None: 

343 writer.cancel() 

344 return True 

345 else: 

346 return False 

347 

348 def add_reader(self, fd, callback, *args): 

349 """Add a reader callback.""" 

350 self._ensure_fd_no_transport(fd) 

351 self._add_reader(fd, callback, *args) 

352 

353 def remove_reader(self, fd): 

354 """Remove a reader callback.""" 

355 self._ensure_fd_no_transport(fd) 

356 return self._remove_reader(fd) 

357 

358 def add_writer(self, fd, callback, *args): 

359 """Add a writer callback..""" 

360 self._ensure_fd_no_transport(fd) 

361 self._add_writer(fd, callback, *args) 

362 

363 def remove_writer(self, fd): 

364 """Remove a writer callback.""" 

365 self._ensure_fd_no_transport(fd) 

366 return self._remove_writer(fd) 

367 

368 async def sock_recv(self, sock, n): 

369 """Receive data from the socket. 

370 

371 The return value is a bytes object representing the data received. 

372 The maximum amount of data to be received at once is specified by 

373 nbytes. 

374 """ 

375 base_events._check_ssl_socket(sock) 

376 if self._debug and sock.gettimeout() != 0: 

377 raise ValueError("the socket must be non-blocking") 

378 try: 

379 return sock.recv(n) 

380 except (BlockingIOError, InterruptedError): 

381 pass 

382 fut = self.create_future() 

383 fd = sock.fileno() 

384 self._ensure_fd_no_transport(fd) 

385 handle = self._add_reader(fd, self._sock_recv, fut, sock, n) 

386 fut.add_done_callback( 

387 functools.partial(self._sock_read_done, fd, handle=handle)) 

388 return await fut 

389 

390 def _sock_read_done(self, fd, fut, handle=None): 

391 if handle is None or not handle.cancelled(): 

392 self.remove_reader(fd) 

393 

394 def _sock_recv(self, fut, sock, n): 

395 # _sock_recv() can add itself as an I/O callback if the operation can't 

396 # be done immediately. Don't use it directly, call sock_recv(). 

397 if fut.done(): 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true

398 return 

399 try: 

400 data = sock.recv(n) 

401 except (BlockingIOError, InterruptedError): 

402 return # try again next time 

403 except (SystemExit, KeyboardInterrupt): 

404 raise 

405 except BaseException as exc: 

406 fut.set_exception(exc) 

407 else: 

408 fut.set_result(data) 

409 

410 async def sock_recv_into(self, sock, buf): 

411 """Receive data from the socket. 

412 

413 The received data is written into *buf* (a writable buffer). 

414 The return value is the number of bytes written. 

415 """ 

416 base_events._check_ssl_socket(sock) 

417 if self._debug and sock.gettimeout() != 0: 

418 raise ValueError("the socket must be non-blocking") 

419 try: 

420 return sock.recv_into(buf) 

421 except (BlockingIOError, InterruptedError): 

422 pass 

423 fut = self.create_future() 

424 fd = sock.fileno() 

425 self._ensure_fd_no_transport(fd) 

426 handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf) 

427 fut.add_done_callback( 

428 functools.partial(self._sock_read_done, fd, handle=handle)) 

429 return await fut 

430 

431 def _sock_recv_into(self, fut, sock, buf): 

432 # _sock_recv_into() can add itself as an I/O callback if the operation 

433 # can't be done immediately. Don't use it directly, call 

434 # sock_recv_into(). 

435 if fut.done(): 435 ↛ 436line 435 didn't jump to line 436 because the condition on line 435 was never true

436 return 

437 try: 

438 nbytes = sock.recv_into(buf) 

439 except (BlockingIOError, InterruptedError): 

440 return # try again next time 

441 except (SystemExit, KeyboardInterrupt): 

442 raise 

443 except BaseException as exc: 

444 fut.set_exception(exc) 

445 else: 

446 fut.set_result(nbytes) 

447 

448 async def sock_recvfrom(self, sock, bufsize): 

449 """Receive a datagram from a datagram socket. 

450 

451 The return value is a tuple of (bytes, address) representing the 

452 datagram received and the address it came from. 

453 The maximum amount of data to be received at once is specified by 

454 nbytes. 

455 """ 

456 base_events._check_ssl_socket(sock) 

457 if self._debug and sock.gettimeout() != 0: 457 ↛ 458line 457 didn't jump to line 458 because the condition on line 457 was never true

458 raise ValueError("the socket must be non-blocking") 

459 try: 

460 return sock.recvfrom(bufsize) 

461 except (BlockingIOError, InterruptedError): 

462 pass 

463 fut = self.create_future() 

464 fd = sock.fileno() 

465 self._ensure_fd_no_transport(fd) 

466 handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize) 

467 fut.add_done_callback( 

468 functools.partial(self._sock_read_done, fd, handle=handle)) 

469 return await fut 

470 

471 def _sock_recvfrom(self, fut, sock, bufsize): 

472 # _sock_recvfrom() can add itself as an I/O callback if the operation 

473 # can't be done immediately. Don't use it directly, call 

474 # sock_recvfrom(). 

475 if fut.done(): 475 ↛ 476line 475 didn't jump to line 476 because the condition on line 475 was never true

476 return 

477 try: 

478 result = sock.recvfrom(bufsize) 

479 except (BlockingIOError, InterruptedError): 

480 return # try again next time 

481 except (SystemExit, KeyboardInterrupt): 

482 raise 

483 except BaseException as exc: 

484 fut.set_exception(exc) 

485 else: 

486 fut.set_result(result) 

487 

488 async def sock_recvfrom_into(self, sock, buf, nbytes=0): 

489 """Receive data from the socket. 

490 

491 The received data is written into *buf* (a writable buffer). 

492 The return value is a tuple of (number of bytes written, address). 

493 """ 

494 base_events._check_ssl_socket(sock) 

495 if self._debug and sock.gettimeout() != 0: 495 ↛ 496line 495 didn't jump to line 496 because the condition on line 495 was never true

496 raise ValueError("the socket must be non-blocking") 

497 if not nbytes: 

498 nbytes = len(buf) 

499 

500 try: 

501 return sock.recvfrom_into(buf, nbytes) 

502 except (BlockingIOError, InterruptedError): 

503 pass 

504 fut = self.create_future() 

505 fd = sock.fileno() 

506 self._ensure_fd_no_transport(fd) 

507 handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf, 

508 nbytes) 

509 fut.add_done_callback( 

510 functools.partial(self._sock_read_done, fd, handle=handle)) 

511 return await fut 

512 

513 def _sock_recvfrom_into(self, fut, sock, buf, bufsize): 

514 # _sock_recv_into() can add itself as an I/O callback if the operation 

515 # can't be done immediately. Don't use it directly, call 

516 # sock_recv_into(). 

517 if fut.done(): 517 ↛ 518line 517 didn't jump to line 518 because the condition on line 517 was never true

518 return 

519 try: 

520 result = sock.recvfrom_into(buf, bufsize) 

521 except (BlockingIOError, InterruptedError): 

522 return # try again next time 

523 except (SystemExit, KeyboardInterrupt): 

524 raise 

525 except BaseException as exc: 

526 fut.set_exception(exc) 

527 else: 

528 fut.set_result(result) 

529 

530 async def sock_sendall(self, sock, data): 

531 """Send data to the socket. 

532 

533 The socket must be connected to a remote socket. This method continues 

534 to send data from data until either all data has been sent or an 

535 error occurs. None is returned on success. On error, an exception is 

536 raised, and there is no way to determine how much data, if any, was 

537 successfully processed by the receiving end of the connection. 

538 """ 

539 base_events._check_ssl_socket(sock) 

540 if self._debug and sock.gettimeout() != 0: 

541 raise ValueError("the socket must be non-blocking") 

542 try: 

543 n = sock.send(data) 

544 except (BlockingIOError, InterruptedError): 

545 n = 0 

546 

547 if n == len(data): 

548 # all data sent 

549 return 

550 

551 fut = self.create_future() 

552 fd = sock.fileno() 

553 self._ensure_fd_no_transport(fd) 

554 # use a trick with a list in closure to store a mutable state 

555 handle = self._add_writer(fd, self._sock_sendall, fut, sock, 

556 memoryview(data), [n]) 

557 fut.add_done_callback( 

558 functools.partial(self._sock_write_done, fd, handle=handle)) 

559 return await fut 

560 

561 def _sock_sendall(self, fut, sock, view, pos): 

562 if fut.done(): 562 ↛ 564line 562 didn't jump to line 564 because the condition on line 562 was never true

563 # Future cancellation can be scheduled on previous loop iteration 

564 return 

565 start = pos[0] 

566 try: 

567 n = sock.send(view[start:]) 

568 except (BlockingIOError, InterruptedError): 

569 return 

570 except (SystemExit, KeyboardInterrupt): 

571 raise 

572 except BaseException as exc: 

573 fut.set_exception(exc) 

574 return 

575 

576 start += n 

577 

578 if start == len(view): 

579 fut.set_result(None) 

580 else: 

581 pos[0] = start 

582 

583 async def sock_sendto(self, sock, data, address): 

584 """Send data to the socket. 

585 

586 The socket must be connected to a remote socket. This method continues 

587 to send data from data until either all data has been sent or an 

588 error occurs. None is returned on success. On error, an exception is 

589 raised, and there is no way to determine how much data, if any, was 

590 successfully processed by the receiving end of the connection. 

591 """ 

592 base_events._check_ssl_socket(sock) 

593 if self._debug and sock.gettimeout() != 0: 593 ↛ 594line 593 didn't jump to line 594 because the condition on line 593 was never true

594 raise ValueError("the socket must be non-blocking") 

595 try: 

596 return sock.sendto(data, address) 

597 except (BlockingIOError, InterruptedError): 

598 pass 

599 

600 fut = self.create_future() 

601 fd = sock.fileno() 

602 self._ensure_fd_no_transport(fd) 

603 # use a trick with a list in closure to store a mutable state 

604 handle = self._add_writer(fd, self._sock_sendto, fut, sock, data, 

605 address) 

606 fut.add_done_callback( 

607 functools.partial(self._sock_write_done, fd, handle=handle)) 

608 return await fut 

609 

610 def _sock_sendto(self, fut, sock, data, address): 

611 if fut.done(): 611 ↛ 613line 611 didn't jump to line 613 because the condition on line 611 was never true

612 # Future cancellation can be scheduled on previous loop iteration 

613 return 

614 try: 

615 n = sock.sendto(data, 0, address) 

616 except (BlockingIOError, InterruptedError): 

617 return 

618 except (SystemExit, KeyboardInterrupt): 

619 raise 

620 except BaseException as exc: 

621 fut.set_exception(exc) 

622 else: 

623 fut.set_result(n) 

624 

625 async def sock_connect(self, sock, address): 

626 """Connect to a remote socket at address. 

627 

628 This method is a coroutine. 

629 """ 

630 base_events._check_ssl_socket(sock) 

631 if self._debug and sock.gettimeout() != 0: 

632 raise ValueError("the socket must be non-blocking") 

633 

634 if sock.family == socket.AF_INET or ( 

635 base_events._HAS_IPv6 and sock.family == socket.AF_INET6): 

636 resolved = await self._ensure_resolved( 

637 address, family=sock.family, type=sock.type, proto=sock.proto, 

638 loop=self, 

639 ) 

640 _, _, _, _, address = resolved[0] 

641 

642 fut = self.create_future() 

643 self._sock_connect(fut, sock, address) 

644 try: 

645 return await fut 

646 finally: 

647 # Needed to break cycles when an exception occurs. 

648 fut = None 

649 

650 def _sock_connect(self, fut, sock, address): 

651 fd = sock.fileno() 

652 try: 

653 sock.connect(address) 

654 except (BlockingIOError, InterruptedError): 

655 # Issue #23618: When the C function connect() fails with EINTR, the 

656 # connection runs in background. We have to wait until the socket 

657 # becomes writable to be notified when the connection succeed or 

658 # fails. 

659 self._ensure_fd_no_transport(fd) 

660 handle = self._add_writer( 

661 fd, self._sock_connect_cb, fut, sock, address) 

662 fut.add_done_callback( 

663 functools.partial(self._sock_write_done, fd, handle=handle)) 

664 except (SystemExit, KeyboardInterrupt): 

665 raise 

666 except BaseException as exc: 

667 fut.set_exception(exc) 

668 else: 

669 fut.set_result(None) 

670 finally: 

671 fut = None 

672 

673 def _sock_write_done(self, fd, fut, handle=None): 

674 if handle is None or not handle.cancelled(): 

675 self.remove_writer(fd) 

676 

677 def _sock_connect_cb(self, fut, sock, address): 

678 if fut.done(): 678 ↛ 679line 678 didn't jump to line 679 because the condition on line 678 was never true

679 return 

680 

681 try: 

682 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 

683 if err != 0: 

684 # Jump to any except clause below. 

685 raise OSError(err, f'Connect call failed {address}') 

686 except (BlockingIOError, InterruptedError): 

687 # socket is still registered, the callback will be retried later 

688 pass 

689 except (SystemExit, KeyboardInterrupt): 

690 raise 

691 except BaseException as exc: 

692 fut.set_exception(exc) 

693 else: 

694 fut.set_result(None) 

695 finally: 

696 fut = None 

697 

698 async def sock_accept(self, sock): 

699 """Accept a connection. 

700 

701 The socket must be bound to an address and listening for connections. 

702 The return value is a pair (conn, address) where conn is a new socket 

703 object usable to send and receive data on the connection, and address 

704 is the address bound to the socket on the other end of the connection. 

705 """ 

706 base_events._check_ssl_socket(sock) 

707 if self._debug and sock.gettimeout() != 0: 

708 raise ValueError("the socket must be non-blocking") 

709 fut = self.create_future() 

710 self._sock_accept(fut, sock) 

711 return await fut 

712 

713 def _sock_accept(self, fut, sock): 

714 fd = sock.fileno() 

715 try: 

716 conn, address = sock.accept() 

717 conn.setblocking(False) 

718 except (BlockingIOError, InterruptedError): 

719 self._ensure_fd_no_transport(fd) 

720 handle = self._add_reader(fd, self._sock_accept, fut, sock) 

721 fut.add_done_callback( 

722 functools.partial(self._sock_read_done, fd, handle=handle)) 

723 except (SystemExit, KeyboardInterrupt): 

724 raise 

725 except BaseException as exc: 

726 fut.set_exception(exc) 

727 else: 

728 fut.set_result((conn, address)) 

729 

730 async def _sendfile_native(self, transp, file, offset, count): 

731 del self._transports[transp._sock_fd] 

732 resume_reading = transp.is_reading() 

733 transp.pause_reading() 

734 await transp._make_empty_waiter() 

735 try: 

736 return await self.sock_sendfile(transp._sock, file, offset, count, 

737 fallback=False) 

738 finally: 

739 transp._reset_empty_waiter() 

740 if resume_reading: 740 ↛ 742line 740 didn't jump to line 742 because the condition on line 740 was always true

741 transp.resume_reading() 

742 self._transports[transp._sock_fd] = transp 

743 

744 def _process_events(self, event_list): 

745 for key, mask in event_list: 

746 fileobj, (reader, writer) = key.fileobj, key.data 

747 if mask & selectors.EVENT_READ and reader is not None: 

748 if reader._cancelled: 

749 self._remove_reader(fileobj) 

750 else: 

751 self._add_callback(reader) 

752 if mask & selectors.EVENT_WRITE and writer is not None: 

753 if writer._cancelled: 

754 self._remove_writer(fileobj) 

755 else: 

756 self._add_callback(writer) 

757 

758 def _stop_serving(self, sock): 

759 self._remove_reader(sock.fileno()) 

760 sock.close() 

761 

762 

763class _SelectorTransport(transports._FlowControlMixin, 

764 transports.Transport): 

765 

766 max_size = 256 * 1024 # Buffer size passed to recv(). 

767 

768 # Attribute used in the destructor: it must be set even if the constructor 

769 # is not called (see _SelectorSslTransport which may start by raising an 

770 # exception) 

771 _sock = None 

772 

773 def __init__(self, loop, sock, protocol, extra=None, server=None): 

774 super().__init__(extra, loop) 

775 self._extra['socket'] = trsock.TransportSocket(sock) 

776 try: 

777 self._extra['sockname'] = sock.getsockname() 

778 except OSError: 

779 self._extra['sockname'] = None 

780 if 'peername' not in self._extra: 

781 try: 

782 self._extra['peername'] = sock.getpeername() 

783 except socket.error: 

784 self._extra['peername'] = None 

785 self._sock = sock 

786 self._sock_fd = sock.fileno() 

787 

788 self._protocol_connected = False 

789 self.set_protocol(protocol) 

790 

791 self._server = server 

792 self._buffer = collections.deque() 

793 self._conn_lost = 0 # Set when call to connection_lost scheduled. 

794 self._closing = False # Set when close() called. 

795 self._paused = False # Set when pause_reading() called 

796 

797 if self._server is not None: 

798 self._server._attach(self) 

799 loop._transports[self._sock_fd] = self 

800 

801 def __repr__(self): 

802 info = [self.__class__.__name__] 

803 if self._sock is None: 803 ↛ 804line 803 didn't jump to line 804 because the condition on line 803 was never true

804 info.append('closed') 

805 elif self._closing: 

806 info.append('closing') 

807 info.append(f'fd={self._sock_fd}') 

808 # test if the transport was closed 

809 if self._loop is not None and not self._loop.is_closed(): 

810 polling = _test_selector_event(self._loop._selector, 

811 self._sock_fd, selectors.EVENT_READ) 

812 if polling: 

813 info.append('read=polling') 

814 else: 

815 info.append('read=idle') 

816 

817 polling = _test_selector_event(self._loop._selector, 

818 self._sock_fd, 

819 selectors.EVENT_WRITE) 

820 if polling: 820 ↛ 821line 820 didn't jump to line 821 because the condition on line 820 was never true

821 state = 'polling' 

822 else: 

823 state = 'idle' 

824 

825 bufsize = self.get_write_buffer_size() 

826 info.append(f'write=<{state}, bufsize={bufsize}>') 

827 return '<{}>'.format(' '.join(info)) 

828 

829 def abort(self): 

830 self._force_close(None) 

831 

832 def set_protocol(self, protocol): 

833 self._protocol = protocol 

834 self._protocol_connected = True 

835 

836 def get_protocol(self): 

837 return self._protocol 

838 

839 def is_closing(self): 

840 return self._closing 

841 

842 def is_reading(self): 

843 return not self.is_closing() and not self._paused 

844 

845 def pause_reading(self): 

846 if not self.is_reading(): 

847 return 

848 self._paused = True 

849 self._loop._remove_reader(self._sock_fd) 

850 if self._loop.get_debug(): 

851 logger.debug("%r pauses reading", self) 

852 

853 def resume_reading(self): 

854 if self._closing or not self._paused: 

855 return 

856 self._paused = False 

857 self._add_reader(self._sock_fd, self._read_ready) 

858 if self._loop.get_debug(): 858 ↛ 859line 858 didn't jump to line 859 because the condition on line 858 was never true

859 logger.debug("%r resumes reading", self) 

860 

861 def close(self): 

862 if self._closing: 

863 return 

864 self._closing = True 

865 self._loop._remove_reader(self._sock_fd) 

866 if not self._buffer: 

867 self._conn_lost += 1 

868 self._loop._remove_writer(self._sock_fd) 

869 self._loop.call_soon(self._call_connection_lost, None) 

870 

871 def __del__(self, _warn=warnings.warn): 

872 if self._sock is not None: 872 ↛ 873line 872 didn't jump to line 873 because the condition on line 872 was never true

873 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 

874 self._sock.close() 

875 if self._server is not None: 

876 self._server._detach(self) 

877 

878 def _fatal_error(self, exc, message='Fatal error on transport'): 

879 # Should be called from exception handler only. 

880 if isinstance(exc, OSError): 

881 if self._loop.get_debug(): 881 ↛ 882line 881 didn't jump to line 882 because the condition on line 881 was never true

882 logger.debug("%r: %s", self, message, exc_info=True) 

883 else: 

884 self._loop.call_exception_handler({ 

885 'message': message, 

886 'exception': exc, 

887 'transport': self, 

888 'protocol': self._protocol, 

889 }) 

890 self._force_close(exc) 

891 

892 def _force_close(self, exc): 

893 if self._conn_lost: 

894 return 

895 if self._buffer: 

896 self._buffer.clear() 

897 self._loop._remove_writer(self._sock_fd) 

898 if not self._closing: 898 ↛ 901line 898 didn't jump to line 901 because the condition on line 898 was always true

899 self._closing = True 

900 self._loop._remove_reader(self._sock_fd) 

901 self._conn_lost += 1 

902 self._loop.call_soon(self._call_connection_lost, exc) 

903 

904 def _call_connection_lost(self, exc): 

905 try: 

906 if self._protocol_connected: 906 ↛ 909line 906 didn't jump to line 909 because the condition on line 906 was always true

907 self._protocol.connection_lost(exc) 

908 finally: 

909 self._sock.close() 

910 self._sock = None 

911 self._protocol = None 

912 self._loop = None 

913 server = self._server 

914 if server is not None: 

915 server._detach(self) 

916 self._server = None 

917 

918 def get_write_buffer_size(self): 

919 return sum(map(len, self._buffer)) 

920 

921 def _add_reader(self, fd, callback, *args): 

922 if not self.is_reading(): 

923 return 

924 self._loop._add_reader(fd, callback, *args) 

925 

926 

927class _SelectorSocketTransport(_SelectorTransport): 

928 

929 _start_tls_compatible = True 

930 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE 

931 

932 def __init__(self, loop, sock, protocol, waiter=None, 

933 extra=None, server=None): 

934 

935 self._read_ready_cb = None 

936 super().__init__(loop, sock, protocol, extra, server) 

937 self._eof = False 

938 self._empty_waiter = None 

939 if _HAS_SENDMSG: 939 ↛ 942line 939 didn't jump to line 942 because the condition on line 939 was always true

940 self._write_ready = self._write_sendmsg 

941 else: 

942 self._write_ready = self._write_send 

943 # Disable the Nagle algorithm -- small writes will be 

944 # sent without waiting for the TCP ACK. This generally 

945 # decreases the latency (in some cases significantly.) 

946 base_events._set_nodelay(self._sock) 

947 

948 self._loop.call_soon(self._protocol.connection_made, self) 

949 # only start reading when connection_made() has been called 

950 self._loop.call_soon(self._add_reader, 

951 self._sock_fd, self._read_ready) 

952 if waiter is not None: 

953 # only wake up the waiter when connection_made() has been called 

954 self._loop.call_soon(futures._set_result_unless_cancelled, 

955 waiter, None) 

956 

957 def set_protocol(self, protocol): 

958 if isinstance(protocol, protocols.BufferedProtocol): 

959 self._read_ready_cb = self._read_ready__get_buffer 

960 else: 

961 self._read_ready_cb = self._read_ready__data_received 

962 

963 super().set_protocol(protocol) 

964 

965 def _read_ready(self): 

966 self._read_ready_cb() 

967 

968 def _read_ready__get_buffer(self): 

969 if self._conn_lost: 969 ↛ 970line 969 didn't jump to line 970 because the condition on line 969 was never true

970 return 

971 

972 try: 

973 buf = self._protocol.get_buffer(-1) 

974 if not len(buf): 

975 raise RuntimeError('get_buffer() returned an empty buffer') 

976 except (SystemExit, KeyboardInterrupt): 

977 raise 

978 except BaseException as exc: 

979 self._fatal_error( 

980 exc, 'Fatal error: protocol.get_buffer() call failed.') 

981 return 

982 

983 try: 

984 nbytes = self._sock.recv_into(buf) 

985 except (BlockingIOError, InterruptedError): 

986 return 

987 except (SystemExit, KeyboardInterrupt): 

988 raise 

989 except BaseException as exc: 

990 self._fatal_error(exc, 'Fatal read error on socket transport') 

991 return 

992 

993 if not nbytes: 

994 self._read_ready__on_eof() 

995 return 

996 

997 try: 

998 self._protocol.buffer_updated(nbytes) 

999 except (SystemExit, KeyboardInterrupt): 

1000 raise 

1001 except BaseException as exc: 

1002 self._fatal_error( 

1003 exc, 'Fatal error: protocol.buffer_updated() call failed.') 

1004 

1005 def _read_ready__data_received(self): 

1006 if self._conn_lost: 1006 ↛ 1007line 1006 didn't jump to line 1007 because the condition on line 1006 was never true

1007 return 

1008 try: 

1009 data = self._sock.recv(self.max_size) 

1010 except (BlockingIOError, InterruptedError): 

1011 return 

1012 except (SystemExit, KeyboardInterrupt): 

1013 raise 

1014 except BaseException as exc: 

1015 self._fatal_error(exc, 'Fatal read error on socket transport') 

1016 return 

1017 

1018 if not data: 

1019 self._read_ready__on_eof() 

1020 return 

1021 

1022 try: 

1023 self._protocol.data_received(data) 

1024 except (SystemExit, KeyboardInterrupt): 

1025 raise 

1026 except BaseException as exc: 

1027 self._fatal_error( 

1028 exc, 'Fatal error: protocol.data_received() call failed.') 

1029 

1030 def _read_ready__on_eof(self): 

1031 if self._loop.get_debug(): 

1032 logger.debug("%r received EOF", self) 

1033 

1034 try: 

1035 keep_open = self._protocol.eof_received() 

1036 except (SystemExit, KeyboardInterrupt): 

1037 raise 

1038 except BaseException as exc: 

1039 self._fatal_error( 

1040 exc, 'Fatal error: protocol.eof_received() call failed.') 

1041 return 

1042 

1043 if keep_open: 

1044 # We're keeping the connection open so the 

1045 # protocol can write more, but we still can't 

1046 # receive more, so remove the reader callback. 

1047 self._loop._remove_reader(self._sock_fd) 

1048 else: 

1049 self.close() 

1050 

1051 def write(self, data): 

1052 if not isinstance(data, (bytes, bytearray, memoryview)): 

1053 raise TypeError(f'data argument must be a bytes-like object, ' 

1054 f'not {type(data).__name__!r}') 

1055 if self._eof: 1055 ↛ 1056line 1055 didn't jump to line 1056 because the condition on line 1055 was never true

1056 raise RuntimeError('Cannot call write() after write_eof()') 

1057 if self._empty_waiter is not None: 

1058 raise RuntimeError('unable to write; sendfile is in progress') 

1059 if not data: 

1060 return 

1061 

1062 if self._conn_lost: 

1063 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 

1064 logger.warning('socket.send() raised exception.') 

1065 self._conn_lost += 1 

1066 return 

1067 

1068 if not self._buffer: 

1069 # Optimization: try to send now. 

1070 try: 

1071 n = self._sock.send(data) 

1072 except (BlockingIOError, InterruptedError): 

1073 pass 

1074 except (SystemExit, KeyboardInterrupt): 

1075 raise 

1076 except BaseException as exc: 

1077 self._fatal_error(exc, 'Fatal write error on socket transport') 

1078 return 

1079 else: 

1080 data = memoryview(data)[n:] 

1081 if not data: 

1082 return 

1083 # Not all was written; register write handler. 

1084 self._loop._add_writer(self._sock_fd, self._write_ready) 

1085 

1086 # Add it to the buffer. 

1087 self._buffer.append(data) 

1088 self._maybe_pause_protocol() 

1089 

1090 def _get_sendmsg_buffer(self): 

1091 return itertools.islice(self._buffer, SC_IOV_MAX) 

1092 

1093 def _write_sendmsg(self): 

1094 assert self._buffer, 'Data should not be empty' 

1095 if self._conn_lost: 1095 ↛ 1096line 1095 didn't jump to line 1096 because the condition on line 1095 was never true

1096 return 

1097 try: 

1098 nbytes = self._sock.sendmsg(self._get_sendmsg_buffer()) 

1099 self._adjust_leftover_buffer(nbytes) 

1100 except (BlockingIOError, InterruptedError): 

1101 pass 

1102 except (SystemExit, KeyboardInterrupt): 

1103 raise 

1104 except BaseException as exc: 

1105 self._loop._remove_writer(self._sock_fd) 

1106 self._buffer.clear() 

1107 self._fatal_error(exc, 'Fatal write error on socket transport') 

1108 if self._empty_waiter is not None: 1108 ↛ 1109line 1108 didn't jump to line 1109 because the condition on line 1108 was never true

1109 self._empty_waiter.set_exception(exc) 

1110 else: 

1111 self._maybe_resume_protocol() # May append to buffer. 

1112 if not self._buffer: 

1113 self._loop._remove_writer(self._sock_fd) 

1114 if self._empty_waiter is not None: 

1115 self._empty_waiter.set_result(None) 

1116 if self._closing: 

1117 self._call_connection_lost(None) 

1118 elif self._eof: 1118 ↛ 1119line 1118 didn't jump to line 1119 because the condition on line 1118 was never true

1119 self._sock.shutdown(socket.SHUT_WR) 

1120 

1121 def _adjust_leftover_buffer(self, nbytes: int) -> None: 

1122 buffer = self._buffer 

1123 while nbytes: 

1124 b = buffer.popleft() 

1125 b_len = len(b) 

1126 if b_len <= nbytes: 

1127 nbytes -= b_len 

1128 else: 

1129 buffer.appendleft(b[nbytes:]) 

1130 break 

1131 

1132 def _write_send(self): 

1133 assert self._buffer, 'Data should not be empty' 

1134 if self._conn_lost: 1134 ↛ 1135line 1134 didn't jump to line 1135 because the condition on line 1134 was never true

1135 return 

1136 try: 

1137 buffer = self._buffer.popleft() 

1138 n = self._sock.send(buffer) 

1139 if n != len(buffer): 

1140 # Not all data was written 

1141 self._buffer.appendleft(buffer[n:]) 

1142 except (BlockingIOError, InterruptedError): 

1143 pass 

1144 except (SystemExit, KeyboardInterrupt): 

1145 raise 

1146 except BaseException as exc: 

1147 self._loop._remove_writer(self._sock_fd) 

1148 self._buffer.clear() 

1149 self._fatal_error(exc, 'Fatal write error on socket transport') 

1150 if self._empty_waiter is not None: 1150 ↛ 1151line 1150 didn't jump to line 1151 because the condition on line 1150 was never true

1151 self._empty_waiter.set_exception(exc) 

1152 else: 

1153 self._maybe_resume_protocol() # May append to buffer. 

1154 if not self._buffer: 

1155 self._loop._remove_writer(self._sock_fd) 

1156 if self._empty_waiter is not None: 1156 ↛ 1157line 1156 didn't jump to line 1157 because the condition on line 1156 was never true

1157 self._empty_waiter.set_result(None) 

1158 if self._closing: 

1159 self._call_connection_lost(None) 

1160 elif self._eof: 

1161 self._sock.shutdown(socket.SHUT_WR) 

1162 

1163 def write_eof(self): 

1164 if self._closing or self._eof: 

1165 return 

1166 self._eof = True 

1167 if not self._buffer: 

1168 self._sock.shutdown(socket.SHUT_WR) 

1169 

1170 def writelines(self, list_of_data): 

1171 if self._eof: 1171 ↛ 1172line 1171 didn't jump to line 1172 because the condition on line 1171 was never true

1172 raise RuntimeError('Cannot call writelines() after write_eof()') 

1173 if self._empty_waiter is not None: 1173 ↛ 1174line 1173 didn't jump to line 1174 because the condition on line 1173 was never true

1174 raise RuntimeError('unable to writelines; sendfile is in progress') 

1175 if not list_of_data: 1175 ↛ 1176line 1175 didn't jump to line 1176 because the condition on line 1175 was never true

1176 return 

1177 self._buffer.extend([memoryview(data) for data in list_of_data]) 

1178 self._write_ready() 

1179 # If the entire buffer couldn't be written, register a write handler 

1180 if self._buffer: 

1181 self._loop._add_writer(self._sock_fd, self._write_ready) 

1182 self._maybe_pause_protocol() 

1183 

1184 def can_write_eof(self): 

1185 return True 

1186 

1187 def _call_connection_lost(self, exc): 

1188 try: 

1189 super()._call_connection_lost(exc) 

1190 finally: 

1191 self._write_ready = None 

1192 if self._empty_waiter is not None: 1192 ↛ 1193line 1192 didn't jump to line 1193 because the condition on line 1192 was never true

1193 self._empty_waiter.set_exception( 

1194 ConnectionError("Connection is closed by peer")) 

1195 

1196 def _make_empty_waiter(self): 

1197 if self._empty_waiter is not None: 1197 ↛ 1198line 1197 didn't jump to line 1198 because the condition on line 1197 was never true

1198 raise RuntimeError("Empty waiter is already set") 

1199 self._empty_waiter = self._loop.create_future() 

1200 if not self._buffer: 

1201 self._empty_waiter.set_result(None) 

1202 return self._empty_waiter 

1203 

1204 def _reset_empty_waiter(self): 

1205 self._empty_waiter = None 

1206 

1207 def close(self): 

1208 self._read_ready_cb = None 

1209 super().close() 

1210 

1211 

1212class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport): 

1213 

1214 _buffer_factory = collections.deque 

1215 

1216 def __init__(self, loop, sock, protocol, address=None, 

1217 waiter=None, extra=None): 

1218 super().__init__(loop, sock, protocol, extra) 

1219 self._address = address 

1220 self._buffer_size = 0 

1221 self._loop.call_soon(self._protocol.connection_made, self) 

1222 # only start reading when connection_made() has been called 

1223 self._loop.call_soon(self._add_reader, 

1224 self._sock_fd, self._read_ready) 

1225 if waiter is not None: 

1226 # only wake up the waiter when connection_made() has been called 

1227 self._loop.call_soon(futures._set_result_unless_cancelled, 

1228 waiter, None) 

1229 

1230 def get_write_buffer_size(self): 

1231 return self._buffer_size 

1232 

1233 def _read_ready(self): 

1234 if self._conn_lost: 1234 ↛ 1235line 1234 didn't jump to line 1235 because the condition on line 1234 was never true

1235 return 

1236 try: 

1237 data, addr = self._sock.recvfrom(self.max_size) 

1238 except (BlockingIOError, InterruptedError): 

1239 pass 

1240 except OSError as exc: 

1241 self._protocol.error_received(exc) 

1242 except (SystemExit, KeyboardInterrupt): 

1243 raise 

1244 except BaseException as exc: 

1245 self._fatal_error(exc, 'Fatal read error on datagram transport') 

1246 else: 

1247 self._protocol.datagram_received(data, addr) 

1248 

1249 def sendto(self, data, addr=None): 

1250 if not isinstance(data, (bytes, bytearray, memoryview)): 

1251 raise TypeError(f'data argument must be a bytes-like object, ' 

1252 f'not {type(data).__name__!r}') 

1253 

1254 if self._address: 

1255 if addr not in (None, self._address): 

1256 raise ValueError( 

1257 f'Invalid address: must be None or {self._address}') 

1258 addr = self._address 

1259 

1260 if self._conn_lost and self._address: 

1261 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 

1262 logger.warning('socket.send() raised exception.') 

1263 self._conn_lost += 1 

1264 return 

1265 

1266 if not self._buffer: 

1267 # Attempt to send it right away first. 

1268 try: 

1269 if self._extra['peername']: 

1270 self._sock.send(data) 

1271 else: 

1272 self._sock.sendto(data, addr) 

1273 return 

1274 except (BlockingIOError, InterruptedError): 

1275 self._loop._add_writer(self._sock_fd, self._sendto_ready) 

1276 except OSError as exc: 

1277 self._protocol.error_received(exc) 

1278 return 

1279 except (SystemExit, KeyboardInterrupt): 

1280 raise 

1281 except BaseException as exc: 

1282 self._fatal_error( 

1283 exc, 'Fatal write error on datagram transport') 

1284 return 

1285 

1286 # Ensure that what we buffer is immutable. 

1287 self._buffer.append((bytes(data), addr)) 

1288 self._buffer_size += len(data) + 8 # include header bytes 

1289 self._maybe_pause_protocol() 

1290 

1291 def _sendto_ready(self): 

1292 while self._buffer: 

1293 data, addr = self._buffer.popleft() 

1294 self._buffer_size -= len(data) 

1295 try: 

1296 if self._extra['peername']: 

1297 self._sock.send(data) 

1298 else: 

1299 self._sock.sendto(data, addr) 

1300 except (BlockingIOError, InterruptedError): 

1301 self._buffer.appendleft((data, addr)) # Try again later. 

1302 self._buffer_size += len(data) 

1303 break 

1304 except OSError as exc: 

1305 self._protocol.error_received(exc) 

1306 return 

1307 except (SystemExit, KeyboardInterrupt): 

1308 raise 

1309 except BaseException as exc: 

1310 self._fatal_error( 

1311 exc, 'Fatal write error on datagram transport') 

1312 return 

1313 

1314 self._maybe_resume_protocol() # May append to buffer. 

1315 if not self._buffer: 

1316 self._loop._remove_writer(self._sock_fd) 

1317 if self._closing: 

1318 self._call_connection_lost(None)