Coverage for Lib / asyncio / base_events.py: 88%

1167 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 02:39 +0000

1"""Base implementation of event loop. 

2 

3The event loop can be broken up into a multiplexer (the part 

4responsible for notifying us of I/O events) and the event loop proper, 

5which wraps a multiplexer with functionality for scheduling callbacks, 

6immediately or at a given time in the future. 

7 

8Whenever a public API takes a callback, subsequent positional 

9arguments will be passed to the callback if/when it is called. This 

10avoids the proliferation of trivial lambdas implementing closures. 

11Keyword arguments for the callback are not supported; this is a 

12conscious design decision, leaving the door open for keyword arguments 

13to modify the meaning of the API call itself. 

14""" 

15 

16import collections 

17import contextvars 

18import collections.abc 

19import concurrent.futures 

20import errno 

21import heapq 

22import itertools 

23import math 

24import os 

25import socket 

26import stat 

27import subprocess 

28import sys 

29import threading 

30import time 

31import traceback 

32import warnings 

33import weakref 

34 

35try: 

36 import ssl 

37except ImportError: # pragma: no cover 

38 ssl = None 

39 

40from . import constants 

41from . import coroutines 

42from . import events 

43from . import exceptions 

44from . import futures 

45from . import protocols 

46from . import sslproto 

47from . import staggered 

48from . import tasks 

49from . import timeouts 

50from . import transports 

51from . import trsock 

52from .log import logger 

53 

54 

55__all__ = 'BaseEventLoop','Server', 

56 

57 

58# Minimum number of _scheduled timer handles before cleanup of 

59# cancelled handles is performed. 

60_MIN_SCHEDULED_TIMER_HANDLES = 100 

61 

62# Minimum fraction of _scheduled timer handles that are cancelled 

63# before cleanup of cancelled handles is performed. 

64_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 

65 

66 

67_HAS_IPv6 = hasattr(socket, 'AF_INET6') 

68 

69# Maximum timeout passed to select to avoid OS limitations 

70MAXIMUM_SELECT_TIMEOUT = 24 * 3600 

71 

72 

73def _format_handle(handle): 

74 cb = handle._callback 

75 if isinstance(getattr(cb, '__self__', None), tasks.Task): 

76 # format the task 

77 return repr(cb.__self__) 

78 else: 

79 return str(handle) 

80 

81 

82def _format_pipe(fd): 

83 if fd == subprocess.PIPE: 

84 return '<pipe>' 

85 elif fd == subprocess.STDOUT: 

86 return '<stdout>' 

87 else: 

88 return repr(fd) 

89 

90 

91def _set_reuseport(sock): 

92 if not hasattr(socket, 'SO_REUSEPORT'): 

93 raise ValueError('reuse_port not supported by socket module') 

94 else: 

95 try: 

96 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 

97 except OSError: 

98 raise ValueError('reuse_port not supported by socket module, ' 

99 'SO_REUSEPORT defined but not implemented.') 

100 

101 

102def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0): 

103 # Try to skip getaddrinfo if "host" is already an IP. Users might have 

104 # handled name resolution in their own code and pass in resolved IPs. 

105 if not hasattr(socket, 'inet_pton'): 

106 return 

107 

108 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \ 

109 host is None: 

110 return None 

111 

112 if type == socket.SOCK_STREAM: 

113 proto = socket.IPPROTO_TCP 

114 elif type == socket.SOCK_DGRAM: 

115 proto = socket.IPPROTO_UDP 

116 else: 

117 return None 

118 

119 if port is None: 

120 port = 0 

121 elif isinstance(port, bytes) and port == b'': 

122 port = 0 

123 elif isinstance(port, str) and port == '': 

124 port = 0 

125 else: 

126 # If port's a service name like "http", don't skip getaddrinfo. 

127 try: 

128 port = int(port) 

129 except (TypeError, ValueError): 

130 return None 

131 

132 if family == socket.AF_UNSPEC: 

133 afs = [socket.AF_INET] 

134 if _HAS_IPv6: 134 ↛ 139line 134 didn't jump to line 139 because the condition on line 134 was always true

135 afs.append(socket.AF_INET6) 

136 else: 

137 afs = [family] 

138 

139 if isinstance(host, bytes): 

140 host = host.decode('idna') 

141 if '%' in host: 

142 # Linux's inet_pton doesn't accept an IPv6 zone index after host, 

143 # like '::1%lo0'. 

144 return None 

145 

146 for af in afs: 

147 try: 

148 socket.inet_pton(af, host) 

149 # The host has already been resolved. 

150 if _HAS_IPv6 and af == socket.AF_INET6: 

151 return af, type, proto, '', (host, port, flowinfo, scopeid) 

152 else: 

153 return af, type, proto, '', (host, port) 

154 except OSError: 

155 pass 

156 

157 # "host" is not an IP address. 

158 return None 

159 

160 

161def _interleave_addrinfos(addrinfos, first_address_family_count=1): 

162 """Interleave list of addrinfo tuples by family.""" 

163 # Group addresses by family 

164 addrinfos_by_family = collections.OrderedDict() 

165 for addr in addrinfos: 

166 family = addr[0] 

167 if family not in addrinfos_by_family: 

168 addrinfos_by_family[family] = [] 

169 addrinfos_by_family[family].append(addr) 

170 addrinfos_lists = list(addrinfos_by_family.values()) 

171 

172 reordered = [] 

173 if first_address_family_count > 1: 

174 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1]) 

175 del addrinfos_lists[0][:first_address_family_count - 1] 

176 reordered.extend( 

177 a for a in itertools.chain.from_iterable( 

178 itertools.zip_longest(*addrinfos_lists) 

179 ) if a is not None) 

180 return reordered 

181 

182 

183def _run_until_complete_cb(fut): 

184 if not fut.cancelled(): 

185 exc = fut.exception() 

186 if isinstance(exc, (SystemExit, KeyboardInterrupt)): 

187 # Issue #22429: run_forever() already finished, no need to 

188 # stop it. 

189 return 

190 futures._get_loop(fut).stop() 

191 

192 

193if hasattr(socket, 'TCP_NODELAY'): 193 ↛ 200line 193 didn't jump to line 200 because the condition on line 193 was always true

194 def _set_nodelay(sock): 

195 if (sock.family in {socket.AF_INET, socket.AF_INET6} and 

196 sock.type == socket.SOCK_STREAM and 

197 sock.proto == socket.IPPROTO_TCP): 

198 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 

199else: 

200 def _set_nodelay(sock): 

201 pass 

202 

203 

204def _check_ssl_socket(sock): 

205 if ssl is not None and isinstance(sock, ssl.SSLSocket): 205 ↛ 206line 205 didn't jump to line 206 because the condition on line 205 was never true

206 raise TypeError("Socket cannot be of type SSLSocket") 

207 

208 

209class _SendfileFallbackProtocol(protocols.Protocol): 

210 def __init__(self, transp): 

211 if not isinstance(transp, transports._FlowControlMixin): 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 raise TypeError("transport should be _FlowControlMixin instance") 

213 self._transport = transp 

214 self._proto = transp.get_protocol() 

215 self._should_resume_reading = transp.is_reading() 

216 self._should_resume_writing = transp._protocol_paused 

217 transp.pause_reading() 

218 transp.set_protocol(self) 

219 if self._should_resume_writing: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true

220 self._write_ready_fut = self._transport._loop.create_future() 

221 else: 

222 self._write_ready_fut = None 

223 

224 async def drain(self): 

225 if self._transport.is_closing(): 225 ↛ 226line 225 didn't jump to line 226 because the condition on line 225 was never true

226 raise ConnectionError("Connection closed by peer") 

227 fut = self._write_ready_fut 

228 if fut is None: 

229 return 

230 await fut 

231 

232 def connection_made(self, transport): 

233 raise RuntimeError("Invalid state: " 

234 "connection should have been established already.") 

235 

236 def connection_lost(self, exc): 

237 if self._write_ready_fut is not None: 237 ↛ 245line 237 didn't jump to line 245 because the condition on line 237 was always true

238 # Never happens if peer disconnects after sending the whole content 

239 # Thus disconnection is always an exception from user perspective 

240 if exc is None: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true

241 self._write_ready_fut.set_exception( 

242 ConnectionError("Connection is closed by peer")) 

243 else: 

244 self._write_ready_fut.set_exception(exc) 

245 self._proto.connection_lost(exc) 

246 

247 def pause_writing(self): 

248 if self._write_ready_fut is not None: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true

249 return 

250 self._write_ready_fut = self._transport._loop.create_future() 

251 

252 def resume_writing(self): 

253 if self._write_ready_fut is None: 253 ↛ 254line 253 didn't jump to line 254 because the condition on line 253 was never true

254 return 

255 self._write_ready_fut.set_result(False) 

256 self._write_ready_fut = None 

257 

258 def data_received(self, data): 

259 raise RuntimeError("Invalid state: reading should be paused") 

260 

261 def eof_received(self): 

262 raise RuntimeError("Invalid state: reading should be paused") 

263 

264 async def restore(self): 

265 self._transport.set_protocol(self._proto) 

266 if self._should_resume_reading: 266 ↛ 268line 266 didn't jump to line 268 because the condition on line 266 was always true

267 self._transport.resume_reading() 

268 if self._write_ready_fut is not None: 

269 # Cancel the future. 

270 # Basically it has no effect because protocol is switched back, 

271 # no code should wait for it anymore. 

272 self._write_ready_fut.cancel() 

273 if self._should_resume_writing: 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true

274 self._proto.resume_writing() 

275 

276 

277class Server(events.AbstractServer): 

278 

279 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, 

280 ssl_handshake_timeout, ssl_shutdown_timeout=None): 

281 self._loop = loop 

282 self._sockets = sockets 

283 # Weak references so we don't break Transport's ability to 

284 # detect abandoned transports 

285 self._clients = weakref.WeakSet() 

286 self._waiters = [] 

287 self._protocol_factory = protocol_factory 

288 self._backlog = backlog 

289 self._ssl_context = ssl_context 

290 self._ssl_handshake_timeout = ssl_handshake_timeout 

291 self._ssl_shutdown_timeout = ssl_shutdown_timeout 

292 self._serving = False 

293 self._serving_forever_fut = None 

294 self._context = contextvars.copy_context() 

295 

296 def __repr__(self): 

297 return f'<{self.__class__.__name__} sockets={self.sockets!r}>' 

298 

299 def _attach(self, transport): 

300 assert self._sockets is not None 

301 self._clients.add(transport) 

302 

303 def _detach(self, transport): 

304 self._clients.discard(transport) 

305 if len(self._clients) == 0 and self._sockets is None: 

306 self._wakeup() 

307 

308 def _wakeup(self): 

309 waiters = self._waiters 

310 self._waiters = None 

311 for waiter in waiters: 

312 if not waiter.done(): 312 ↛ 311line 312 didn't jump to line 311 because the condition on line 312 was always true

313 waiter.set_result(None) 

314 

315 def _start_serving(self): 

316 if self._serving: 

317 return 

318 self._serving = True 

319 for sock in self._sockets: 

320 sock.listen(self._backlog) 

321 self._loop._start_serving( 

322 self._protocol_factory, sock, self._ssl_context, 

323 self, self._backlog, self._ssl_handshake_timeout, 

324 self._ssl_shutdown_timeout, context=self._context) 

325 

326 def get_loop(self): 

327 return self._loop 

328 

329 def is_serving(self): 

330 return self._serving 

331 

332 @property 

333 def sockets(self): 

334 if self._sockets is None: 

335 return () 

336 return tuple(trsock.TransportSocket(s) for s in self._sockets) 

337 

338 def close(self): 

339 sockets = self._sockets 

340 if sockets is None: 

341 return 

342 self._sockets = None 

343 

344 for sock in sockets: 

345 self._loop._stop_serving(sock) 

346 

347 self._serving = False 

348 

349 if (self._serving_forever_fut is not None and 349 ↛ 351line 349 didn't jump to line 351 because the condition on line 349 was never true

350 not self._serving_forever_fut.done()): 

351 self._serving_forever_fut.cancel() 

352 self._serving_forever_fut = None 

353 

354 if len(self._clients) == 0: 

355 self._wakeup() 

356 

357 def close_clients(self): 

358 for transport in self._clients.copy(): 

359 transport.close() 

360 

361 def abort_clients(self): 

362 for transport in self._clients.copy(): 

363 transport.abort() 

364 

365 async def start_serving(self): 

366 self._start_serving() 

367 # Skip one loop iteration so that all 'loop.add_reader' 

368 # go through. 

369 await tasks.sleep(0) 

370 

371 async def serve_forever(self): 

372 if self._serving_forever_fut is not None: 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true

373 raise RuntimeError( 

374 f'server {self!r} is already being awaited on serve_forever()') 

375 if self._sockets is None: 

376 raise RuntimeError(f'server {self!r} is closed') 

377 

378 self._start_serving() 

379 self._serving_forever_fut = self._loop.create_future() 

380 

381 try: 

382 await self._serving_forever_fut 

383 except exceptions.CancelledError: 

384 try: 

385 self.close() 

386 self.close_clients() 

387 await self.wait_closed() 

388 finally: 

389 raise 

390 finally: 

391 self._serving_forever_fut = None 

392 

393 async def wait_closed(self): 

394 """Wait until server is closed and all connections are dropped. 

395 

396 - If the server is not closed, wait. 

397 - If it is closed, but there are still active connections, wait. 

398 

399 Anyone waiting here will be unblocked once both conditions 

400 (server is closed and all connections have been dropped) 

401 have become true, in either order. 

402 

403 Historical note: In 3.11 and before, this was broken, returning 

404 immediately if the server was already closed, even if there 

405 were still active connections. An attempted fix in 3.12.0 was 

406 still broken, returning immediately if the server was still 

407 open and there were no active connections. Hopefully in 3.12.1 

408 we have it right. 

409 """ 

410 # Waiters are unblocked by self._wakeup(), which is called 

411 # from two places: self.close() and self._detach(), but only 

412 # when both conditions have become true. To signal that this 

413 # has happened, self._wakeup() sets self._waiters to None. 

414 if self._waiters is None: 

415 return 

416 waiter = self._loop.create_future() 

417 self._waiters.append(waiter) 

418 await waiter 

419 

420 

421class BaseEventLoop(events.AbstractEventLoop): 

422 

423 def __init__(self): 

424 self._timer_cancelled_count = 0 

425 self._closed = False 

426 self._stopping = False 

427 self._ready = collections.deque() 

428 self._scheduled = [] 

429 self._default_executor = None 

430 self._internal_fds = 0 

431 # Identifier of the thread running the event loop, or None if the 

432 # event loop is not running 

433 self._thread_id = None 

434 self._clock_resolution = time.get_clock_info('monotonic').resolution 

435 self._exception_handler = None 

436 self.set_debug(coroutines._is_debug_mode()) 

437 # The preserved state of async generator hooks. 

438 self._old_agen_hooks = None 

439 # In debug mode, if the execution of a callback or a step of a task 

440 # exceed this duration in seconds, the slow callback/task is logged. 

441 self.slow_callback_duration = 0.1 

442 self._current_handle = None 

443 self._task_factory = None 

444 self._coroutine_origin_tracking_enabled = False 

445 self._coroutine_origin_tracking_saved_depth = None 

446 

447 # A weak set of all asynchronous generators that are 

448 # being iterated by the loop. 

449 self._asyncgens = weakref.WeakSet() 

450 # Set to True when `loop.shutdown_asyncgens` is called. 

451 self._asyncgens_shutdown_called = False 

452 # Set to True when `loop.shutdown_default_executor` is called. 

453 self._executor_shutdown_called = False 

454 

455 def __repr__(self): 

456 return ( 

457 f'<{self.__class__.__name__} running={self.is_running()} ' 

458 f'closed={self.is_closed()} debug={self.get_debug()}>' 

459 ) 

460 

461 def create_future(self): 

462 """Create a Future object attached to the loop.""" 

463 return futures.Future(loop=self) 

464 

465 def create_task(self, coro, **kwargs): 

466 """Schedule or begin executing a coroutine object. 

467 

468 Return a task object. 

469 """ 

470 self._check_closed() 

471 if self._task_factory is not None: 

472 return self._task_factory(self, coro, **kwargs) 

473 

474 task = tasks.Task(coro, loop=self, **kwargs) 

475 if task._source_traceback: 

476 del task._source_traceback[-1] 

477 try: 

478 return task 

479 finally: 

480 # gh-128552: prevent a refcycle of 

481 # task.exception().__traceback__->BaseEventLoop.create_task->task 

482 del task 

483 

484 def set_task_factory(self, factory): 

485 """Set a task factory that will be used by loop.create_task(). 

486 

487 If factory is None the default task factory will be set. 

488 

489 If factory is a callable, it should have a signature matching 

490 '(loop, coro, **kwargs)', where 'loop' will be a reference to the active 

491 event loop, 'coro' will be a coroutine object, and **kwargs will be 

492 arbitrary keyword arguments that should be passed on to Task. 

493 The callable must return a Task. 

494 """ 

495 if factory is not None and not callable(factory): 

496 raise TypeError('task factory must be a callable or None') 

497 self._task_factory = factory 

498 

499 def get_task_factory(self): 

500 """Return a task factory, or None if the default one is in use.""" 

501 return self._task_factory 

502 

503 def _make_socket_transport(self, sock, protocol, waiter=None, *, 

504 extra=None, server=None): 

505 """Create socket transport.""" 

506 raise NotImplementedError 

507 

508 def _make_ssl_transport( 

509 self, rawsock, protocol, sslcontext, waiter=None, 

510 *, server_side=False, server_hostname=None, 

511 extra=None, server=None, 

512 ssl_handshake_timeout=None, 

513 ssl_shutdown_timeout=None, 

514 call_connection_made=True, 

515 context=None): 

516 """Create SSL transport.""" 

517 raise NotImplementedError 

518 

519 def _make_datagram_transport(self, sock, protocol, 

520 address=None, waiter=None, extra=None): 

521 """Create datagram transport.""" 

522 raise NotImplementedError 

523 

524 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 

525 extra=None): 

526 """Create read pipe transport.""" 

527 raise NotImplementedError 

528 

529 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 

530 extra=None): 

531 """Create write pipe transport.""" 

532 raise NotImplementedError 

533 

534 async def _make_subprocess_transport(self, protocol, args, shell, 

535 stdin, stdout, stderr, bufsize, 

536 extra=None, **kwargs): 

537 """Create subprocess transport.""" 

538 raise NotImplementedError 

539 

540 def _write_to_self(self): 

541 """Write a byte to self-pipe, to wake up the event loop. 

542 

543 This may be called from a different thread. 

544 

545 The subclass is responsible for implementing the self-pipe. 

546 """ 

547 raise NotImplementedError 

548 

549 def _process_events(self, event_list): 

550 """Process selector events.""" 

551 raise NotImplementedError 

552 

553 def _check_closed(self): 

554 if self._closed: 

555 raise RuntimeError('Event loop is closed') 

556 

557 def _check_default_executor(self): 

558 if self._executor_shutdown_called: 558 ↛ 559line 558 didn't jump to line 559 because the condition on line 558 was never true

559 raise RuntimeError('Executor shutdown has been called') 

560 

561 def _asyncgen_finalizer_hook(self, agen): 

562 self._asyncgens.discard(agen) 

563 if not self.is_closed(): 563 ↛ exitline 563 didn't return from function '_asyncgen_finalizer_hook' because the condition on line 563 was always true

564 self.call_soon_threadsafe(self.create_task, agen.aclose()) 

565 

566 def _asyncgen_firstiter_hook(self, agen): 

567 if self._asyncgens_shutdown_called: 567 ↛ 568line 567 didn't jump to line 568 because the condition on line 567 was never true

568 warnings.warn( 

569 f"asynchronous generator {agen!r} was scheduled after " 

570 f"loop.shutdown_asyncgens() call", 

571 ResourceWarning, source=self) 

572 

573 self._asyncgens.add(agen) 

574 

575 async def shutdown_asyncgens(self): 

576 """Shutdown all active asynchronous generators.""" 

577 self._asyncgens_shutdown_called = True 

578 

579 if not len(self._asyncgens): 579 ↛ 584line 579 didn't jump to line 584 because the condition on line 579 was always true

580 # If Python version is <3.6 or we don't have any asynchronous 

581 # generators alive. 

582 return 

583 

584 closing_agens = list(self._asyncgens) 

585 self._asyncgens.clear() 

586 

587 results = await tasks.gather( 

588 *[ag.aclose() for ag in closing_agens], 

589 return_exceptions=True) 

590 

591 for result, agen in zip(results, closing_agens): 

592 if isinstance(result, Exception): 

593 self.call_exception_handler({ 

594 'message': f'an error occurred during closing of ' 

595 f'asynchronous generator {agen!r}', 

596 'exception': result, 

597 'asyncgen': agen 

598 }) 

599 

600 async def shutdown_default_executor(self, timeout=None): 

601 """Schedule the shutdown of the default executor. 

602 

603 The timeout parameter specifies the amount of time the executor will 

604 be given to finish joining. The default value is None, which means 

605 that the executor will be given an unlimited amount of time. 

606 """ 

607 self._executor_shutdown_called = True 

608 if self._default_executor is None: 

609 return 

610 future = self.create_future() 

611 thread = threading.Thread(target=self._do_shutdown, args=(future,)) 

612 thread.start() 

613 try: 

614 async with timeouts.timeout(timeout): 

615 await future 

616 except TimeoutError: 

617 warnings.warn("The executor did not finishing joining " 

618 f"its threads within {timeout} seconds.", 

619 RuntimeWarning, stacklevel=2) 

620 self._default_executor.shutdown(wait=False) 

621 else: 

622 thread.join() 

623 

624 def _do_shutdown(self, future): 

625 try: 

626 self._default_executor.shutdown(wait=True) 

627 if not self.is_closed(): 627 ↛ exitline 627 didn't return from function '_do_shutdown' because the condition on line 627 was always true

628 self.call_soon_threadsafe(futures._set_result_unless_cancelled, 

629 future, None) 

630 except Exception as ex: 

631 if not self.is_closed() and not future.cancelled(): 

632 self.call_soon_threadsafe(future.set_exception, ex) 

633 

634 def _check_running(self): 

635 if self.is_running(): 

636 raise RuntimeError('This event loop is already running') 

637 if events._get_running_loop() is not None: 

638 raise RuntimeError( 

639 'Cannot run the event loop while another loop is running') 

640 

641 def _run_forever_setup(self): 

642 """Prepare the run loop to process events. 

643 

644 This method exists so that custom event loop subclasses (e.g., event loops 

645 that integrate a GUI event loop with Python's event loop) have access to all the 

646 loop setup logic. 

647 """ 

648 self._check_closed() 

649 self._check_running() 

650 self._set_coroutine_origin_tracking(self._debug) 

651 

652 self._old_agen_hooks = sys.get_asyncgen_hooks() 

653 self._thread_id = threading.get_ident() 

654 sys.set_asyncgen_hooks( 

655 firstiter=self._asyncgen_firstiter_hook, 

656 finalizer=self._asyncgen_finalizer_hook 

657 ) 

658 

659 events._set_running_loop(self) 

660 

661 def _run_forever_cleanup(self): 

662 """Clean up after an event loop finishes the looping over events. 

663 

664 This method exists so that custom event loop subclasses (e.g., event loops 

665 that integrate a GUI event loop with Python's event loop) have access to all the 

666 loop cleanup logic. 

667 """ 

668 self._stopping = False 

669 self._thread_id = None 

670 events._set_running_loop(None) 

671 self._set_coroutine_origin_tracking(False) 

672 # Restore any pre-existing async generator hooks. 

673 if self._old_agen_hooks is not None: 673 ↛ exitline 673 didn't return from function '_run_forever_cleanup' because the condition on line 673 was always true

674 sys.set_asyncgen_hooks(*self._old_agen_hooks) 

675 self._old_agen_hooks = None 

676 

677 def run_forever(self): 

678 """Run until stop() is called.""" 

679 self._run_forever_setup() 

680 try: 

681 while True: 

682 self._run_once() 

683 if self._stopping: 

684 break 

685 finally: 

686 self._run_forever_cleanup() 

687 

688 def run_until_complete(self, future): 

689 """Run until the Future is done. 

690 

691 If the argument is a coroutine, it is wrapped in a Task. 

692 

693 WARNING: It would be disastrous to call run_until_complete() 

694 with the same coroutine twice -- it would wrap it in two 

695 different Tasks and that can't be good. 

696 

697 Return the Future's result, or raise its exception. 

698 """ 

699 self._check_closed() 

700 self._check_running() 

701 

702 new_task = not futures.isfuture(future) 

703 future = tasks.ensure_future(future, loop=self) 

704 if new_task: 

705 # An exception is raised if the future didn't complete, so there 

706 # is no need to log the "destroy pending task" message 

707 future._log_destroy_pending = False 

708 

709 future.add_done_callback(_run_until_complete_cb) 

710 try: 

711 self.run_forever() 

712 except: 

713 if new_task and future.done() and not future.cancelled(): 

714 # The coroutine raised a BaseException. Consume the exception 

715 # to not log a warning, the caller doesn't have access to the 

716 # local task. 

717 future.exception() 

718 raise 

719 finally: 

720 future.remove_done_callback(_run_until_complete_cb) 

721 if not future.done(): 

722 raise RuntimeError('Event loop stopped before Future completed.') 

723 

724 return future.result() 

725 

726 def stop(self): 

727 """Stop running the event loop. 

728 

729 Every callback already scheduled will still run. This simply informs 

730 run_forever to stop looping after a complete iteration. 

731 """ 

732 self._stopping = True 

733 

734 def close(self): 

735 """Close the event loop. 

736 

737 This clears the queues and shuts down the executor, 

738 but does not wait for the executor to finish. 

739 

740 The event loop must not be running. 

741 """ 

742 if self.is_running(): 742 ↛ 743line 742 didn't jump to line 743 because the condition on line 742 was never true

743 raise RuntimeError("Cannot close a running event loop") 

744 if self._closed: 

745 return 

746 if self._debug: 

747 logger.debug("Close %r", self) 

748 self._closed = True 

749 self._ready.clear() 

750 self._scheduled.clear() 

751 self._executor_shutdown_called = True 

752 executor = self._default_executor 

753 if executor is not None: 

754 self._default_executor = None 

755 executor.shutdown(wait=False) 

756 

757 def is_closed(self): 

758 """Returns True if the event loop was closed.""" 

759 return self._closed 

760 

761 def __del__(self, _warn=warnings.warn): 

762 if not self.is_closed(): 762 ↛ 763line 762 didn't jump to line 763 because the condition on line 762 was never true

763 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self) 

764 if not self.is_running(): 

765 self.close() 

766 

767 def is_running(self): 

768 """Returns True if the event loop is running.""" 

769 return (self._thread_id is not None) 

770 

771 def time(self): 

772 """Return the time according to the event loop's clock. 

773 

774 This is a float expressed in seconds since an epoch, but the 

775 epoch, precision, accuracy and drift are unspecified and may 

776 differ per event loop. 

777 """ 

778 return time.monotonic() 

779 

780 def call_later(self, delay, callback, *args, context=None): 

781 """Arrange for a callback to be called at a given time. 

782 

783 Return a Handle: an opaque object with a cancel() method that 

784 can be used to cancel the call. 

785 

786 The delay can be an int or float, expressed in seconds. It is 

787 always relative to the current time. 

788 

789 Each callback will be called exactly once. If two callbacks 

790 are scheduled for exactly the same time, it is undefined which 

791 will be called first. 

792 

793 Any positional arguments after the callback will be passed to 

794 the callback when it is called. 

795 """ 

796 if delay is None: 

797 raise TypeError('delay must not be None') 

798 timer = self.call_at(self.time() + delay, callback, *args, 

799 context=context) 

800 if timer._source_traceback: 

801 del timer._source_traceback[-1] 

802 return timer 

803 

804 def call_at(self, when, callback, *args, context=None): 

805 """Like call_later(), but uses an absolute time. 

806 

807 Absolute time corresponds to the event loop's time() method. 

808 """ 

809 if when is None: 

810 raise TypeError("when cannot be None") 

811 self._check_closed() 

812 if self._debug: 

813 self._check_thread() 

814 self._check_callback(callback, 'call_at') 

815 timer = events.TimerHandle(when, callback, args, self, context) 

816 if timer._source_traceback: 

817 del timer._source_traceback[-1] 

818 heapq.heappush(self._scheduled, timer) 

819 timer._scheduled = True 

820 return timer 

821 

822 def call_soon(self, callback, *args, context=None): 

823 """Arrange for a callback to be called as soon as possible. 

824 

825 This operates as a FIFO queue: callbacks are called in the 

826 order in which they are registered. Each callback will be 

827 called exactly once. 

828 

829 Any positional arguments after the callback will be passed to 

830 the callback when it is called. 

831 """ 

832 self._check_closed() 

833 if self._debug: 

834 self._check_thread() 

835 self._check_callback(callback, 'call_soon') 

836 handle = self._call_soon(callback, args, context) 

837 if handle._source_traceback: 

838 del handle._source_traceback[-1] 

839 return handle 

840 

841 def _check_callback(self, callback, method): 

842 if (coroutines.iscoroutine(callback) or 

843 coroutines._iscoroutinefunction(callback)): 

844 raise TypeError( 

845 f"coroutines cannot be used with {method}()") 

846 if not callable(callback): 

847 raise TypeError( 

848 f'a callable object was expected by {method}(), ' 

849 f'got {callback!r}') 

850 

851 def _call_soon(self, callback, args, context): 

852 handle = events.Handle(callback, args, self, context) 

853 if handle._source_traceback: 

854 del handle._source_traceback[-1] 

855 self._ready.append(handle) 

856 return handle 

857 

858 def _check_thread(self): 

859 """Check that the current thread is the thread running the event loop. 

860 

861 Non-thread-safe methods of this class make this assumption and will 

862 likely behave incorrectly when the assumption is violated. 

863 

864 Should only be called when (self._debug == True). The caller is 

865 responsible for checking this condition for performance reasons. 

866 """ 

867 if self._thread_id is None: 

868 return 

869 thread_id = threading.get_ident() 

870 if thread_id != self._thread_id: 

871 raise RuntimeError( 

872 "Non-thread-safe operation invoked on an event loop other " 

873 "than the current one") 

874 

875 def call_soon_threadsafe(self, callback, *args, context=None): 

876 """Like call_soon(), but thread-safe.""" 

877 self._check_closed() 

878 if self._debug: 

879 self._check_callback(callback, 'call_soon_threadsafe') 

880 handle = events._ThreadSafeHandle(callback, args, self, context) 

881 self._ready.append(handle) 

882 if handle._source_traceback: 

883 del handle._source_traceback[-1] 

884 if handle._source_traceback: 

885 del handle._source_traceback[-1] 

886 self._write_to_self() 

887 return handle 

888 

889 def run_in_executor(self, executor, func, *args): 

890 self._check_closed() 

891 if self._debug: 

892 self._check_callback(func, 'run_in_executor') 

893 if executor is None: 

894 executor = self._default_executor 

895 # Only check when the default executor is being used 

896 self._check_default_executor() 

897 if executor is None: 

898 executor = concurrent.futures.ThreadPoolExecutor( 

899 thread_name_prefix='asyncio' 

900 ) 

901 self._default_executor = executor 

902 return futures.wrap_future( 

903 executor.submit(func, *args), loop=self) 

904 

905 def set_default_executor(self, executor): 

906 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): 

907 raise TypeError('executor must be ThreadPoolExecutor instance') 

908 self._default_executor = executor 

909 

910 def _getaddrinfo_debug(self, host, port, family, type, proto, flags): 

911 msg = [f"{host}:{port!r}"] 

912 if family: 

913 msg.append(f'family={family!r}') 

914 if type: 

915 msg.append(f'type={type!r}') 

916 if proto: 

917 msg.append(f'proto={proto!r}') 

918 if flags: 

919 msg.append(f'flags={flags!r}') 

920 msg = ', '.join(msg) 

921 logger.debug('Get address info %s', msg) 

922 

923 t0 = self.time() 

924 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags) 

925 dt = self.time() - t0 

926 

927 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}' 

928 if dt >= self.slow_callback_duration: 

929 logger.info(msg) 

930 else: 

931 logger.debug(msg) 

932 return addrinfo 

933 

934 async def getaddrinfo(self, host, port, *, 

935 family=0, type=0, proto=0, flags=0): 

936 if self._debug: 936 ↛ 937line 936 didn't jump to line 937 because the condition on line 936 was never true

937 getaddr_func = self._getaddrinfo_debug 

938 else: 

939 getaddr_func = socket.getaddrinfo 

940 

941 return await self.run_in_executor( 

942 None, getaddr_func, host, port, family, type, proto, flags) 

943 

944 async def getnameinfo(self, sockaddr, flags=0): 

945 return await self.run_in_executor( 

946 None, socket.getnameinfo, sockaddr, flags) 

947 

948 async def sock_sendfile(self, sock, file, offset=0, count=None, 

949 *, fallback=True): 

950 if self._debug and sock.gettimeout() != 0: 

951 raise ValueError("the socket must be non-blocking") 

952 _check_ssl_socket(sock) 

953 self._check_sendfile_params(sock, file, offset, count) 

954 try: 

955 return await self._sock_sendfile_native(sock, file, 

956 offset, count) 

957 except exceptions.SendfileNotAvailableError: 

958 if not fallback: 

959 raise 

960 return await self._sock_sendfile_fallback(sock, file, 

961 offset, count) 

962 

963 async def _sock_sendfile_native(self, sock, file, offset, count): 

964 # NB: sendfile syscall is not supported for SSL sockets and 

965 # non-mmap files even if sendfile is supported by OS 

966 raise exceptions.SendfileNotAvailableError( 

967 f"syscall sendfile is not available for socket {sock!r} " 

968 f"and file {file!r} combination") 

969 

970 async def _sock_sendfile_fallback(self, sock, file, offset, count): 

971 if offset: 

972 file.seek(offset) 

973 blocksize = ( 

974 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE) 

975 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE 

976 ) 

977 buf = bytearray(blocksize) 

978 total_sent = 0 

979 try: 

980 while True: 

981 if count: 

982 blocksize = min(count - total_sent, blocksize) 

983 if blocksize <= 0: 

984 break 

985 view = memoryview(buf)[:blocksize] 

986 read = await self.run_in_executor(None, file.readinto, view) 

987 if not read: 

988 break # EOF 

989 await self.sock_sendall(sock, view[:read]) 

990 total_sent += read 

991 return total_sent 

992 finally: 

993 if total_sent > 0 and hasattr(file, 'seek'): 

994 file.seek(offset + total_sent) 

995 

996 def _check_sendfile_params(self, sock, file, offset, count): 

997 if 'b' not in getattr(file, 'mode', 'b'): 

998 raise ValueError("file should be opened in binary mode") 

999 if not sock.type == socket.SOCK_STREAM: 

1000 raise ValueError("only SOCK_STREAM type sockets are supported") 

1001 if count is not None: 

1002 if not isinstance(count, int): 

1003 raise TypeError( 

1004 "count must be a positive integer (got {!r})".format(count)) 

1005 if count <= 0: 

1006 raise ValueError( 

1007 "count must be a positive integer (got {!r})".format(count)) 

1008 if not isinstance(offset, int): 

1009 raise TypeError( 

1010 "offset must be a non-negative integer (got {!r})".format( 

1011 offset)) 

1012 if offset < 0: 

1013 raise ValueError( 

1014 "offset must be a non-negative integer (got {!r})".format( 

1015 offset)) 

1016 

1017 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None): 

1018 """Create, bind and connect one socket.""" 

1019 my_exceptions = [] 

1020 exceptions.append(my_exceptions) 

1021 family, type_, proto, _, address = addr_info 

1022 sock = None 

1023 try: 

1024 try: 

1025 sock = socket.socket(family=family, type=type_, proto=proto) 

1026 sock.setblocking(False) 

1027 if local_addr_infos is not None: 

1028 for lfamily, _, _, _, laddr in local_addr_infos: 

1029 # skip local addresses of different family 

1030 if lfamily != family: 

1031 continue 

1032 try: 

1033 sock.bind(laddr) 

1034 break 

1035 except OSError as exc: 

1036 msg = ( 

1037 f'error while attempting to bind on ' 

1038 f'address {laddr!r}: {str(exc).lower()}' 

1039 ) 

1040 exc = OSError(exc.errno, msg) 

1041 my_exceptions.append(exc) 

1042 else: # all bind attempts failed 

1043 if my_exceptions: 

1044 raise my_exceptions.pop() 

1045 else: 

1046 raise OSError(f"no matching local address with {family=} found") 

1047 await self.sock_connect(sock, address) 

1048 return sock 

1049 except OSError as exc: 

1050 my_exceptions.append(exc) 

1051 raise 

1052 except: 

1053 if sock is not None: 

1054 try: 

1055 sock.close() 

1056 except OSError: 

1057 # An error when closing a newly created socket is 

1058 # not important, but it can overwrite more important 

1059 # non-OSError error. So ignore it. 

1060 pass 

1061 raise 

1062 finally: 

1063 exceptions = my_exceptions = None 

1064 

1065 async def create_connection( 

1066 self, protocol_factory, host=None, port=None, 

1067 *, ssl=None, family=0, 

1068 proto=0, flags=0, sock=None, 

1069 local_addr=None, server_hostname=None, 

1070 ssl_handshake_timeout=None, 

1071 ssl_shutdown_timeout=None, 

1072 happy_eyeballs_delay=None, interleave=None, 

1073 all_errors=False): 

1074 """Connect to a TCP server. 

1075 

1076 Create a streaming transport connection to a given internet host and 

1077 port: socket family AF_INET or socket.AF_INET6 depending on host (or 

1078 family if specified), socket type SOCK_STREAM. protocol_factory must be 

1079 a callable returning a protocol instance. 

1080 

1081 This method is a coroutine which will try to establish the connection 

1082 in the background. When successful, the coroutine returns a 

1083 (transport, protocol) pair. 

1084 """ 

1085 if server_hostname is not None and not ssl: 

1086 raise ValueError('server_hostname is only meaningful with ssl') 

1087 

1088 if server_hostname is None and ssl: 

1089 # Use host as default for server_hostname. It is an error 

1090 # if host is empty or not set, e.g. when an 

1091 # already-connected socket was passed or when only a port 

1092 # is given. To avoid this error, you can pass 

1093 # server_hostname='' -- this will bypass the hostname 

1094 # check. (This also means that if host is a numeric 

1095 # IP/IPv6 address, we will attempt to verify that exact 

1096 # address; this will probably fail, but it is possible to 

1097 # create a certificate for a specific IP address, so we 

1098 # don't judge it here.) 

1099 if not host: 

1100 raise ValueError('You must set server_hostname ' 

1101 'when using ssl without a host') 

1102 server_hostname = host 

1103 

1104 if ssl_handshake_timeout is not None and not ssl: 

1105 raise ValueError( 

1106 'ssl_handshake_timeout is only meaningful with ssl') 

1107 

1108 if ssl_shutdown_timeout is not None and not ssl: 1108 ↛ 1109line 1108 didn't jump to line 1109 because the condition on line 1108 was never true

1109 raise ValueError( 

1110 'ssl_shutdown_timeout is only meaningful with ssl') 

1111 

1112 if sock is not None: 

1113 _check_ssl_socket(sock) 

1114 

1115 if happy_eyeballs_delay is not None and interleave is None: 

1116 # If using happy eyeballs, default to interleave addresses by family 

1117 interleave = 1 

1118 

1119 if host is not None or port is not None: 

1120 if sock is not None: 

1121 raise ValueError( 

1122 'host/port and sock can not be specified at the same time') 

1123 

1124 infos = await self._ensure_resolved( 

1125 (host, port), family=family, 

1126 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) 

1127 if not infos: 

1128 raise OSError('getaddrinfo() returned empty list') 

1129 

1130 if local_addr is not None: 

1131 laddr_infos = await self._ensure_resolved( 

1132 local_addr, family=family, 

1133 type=socket.SOCK_STREAM, proto=proto, 

1134 flags=flags, loop=self) 

1135 if not laddr_infos: 

1136 raise OSError('getaddrinfo() returned empty list') 

1137 else: 

1138 laddr_infos = None 

1139 

1140 if interleave: 

1141 infos = _interleave_addrinfos(infos, interleave) 

1142 

1143 exceptions = [] 

1144 if happy_eyeballs_delay is None: 

1145 # not using happy eyeballs 

1146 for addrinfo in infos: 

1147 try: 

1148 sock = await self._connect_sock( 

1149 exceptions, addrinfo, laddr_infos) 

1150 break 

1151 except OSError: 

1152 continue 

1153 else: # using happy eyeballs 

1154 sock = (await staggered.staggered_race( 

1155 ( 

1156 # can't use functools.partial as it keeps a reference 

1157 # to exceptions 

1158 lambda addrinfo=addrinfo: self._connect_sock( 

1159 exceptions, addrinfo, laddr_infos 

1160 ) 

1161 for addrinfo in infos 

1162 ), 

1163 happy_eyeballs_delay, 

1164 loop=self, 

1165 ))[0] # can't use sock, _, _ as it keeks a reference to exceptions 

1166 

1167 if sock is None: 

1168 exceptions = [exc for sub in exceptions for exc in sub] 

1169 try: 

1170 if all_errors: 

1171 raise ExceptionGroup("create_connection failed", exceptions) 

1172 if len(exceptions) == 1: 

1173 raise exceptions[0] 

1174 elif exceptions: 

1175 # If they all have the same str(), raise one. 

1176 model = str(exceptions[0]) 

1177 if all(str(exc) == model for exc in exceptions): 

1178 raise exceptions[0] 

1179 # Raise a combined exception so the user can see all 

1180 # the various error messages. 

1181 raise OSError('Multiple exceptions: {}'.format( 

1182 ', '.join(str(exc) for exc in exceptions))) 

1183 else: 

1184 # No exceptions were collected, raise a timeout error 

1185 raise TimeoutError('create_connection failed') 

1186 finally: 

1187 exceptions = None 

1188 

1189 else: 

1190 if sock is None: 

1191 raise ValueError( 

1192 'host and port was not specified and no sock specified') 

1193 if sock.type != socket.SOCK_STREAM: 

1194 # We allow AF_INET, AF_INET6, AF_UNIX as long as they 

1195 # are SOCK_STREAM. 

1196 # We support passing AF_UNIX sockets even though we have 

1197 # a dedicated API for that: create_unix_connection. 

1198 # Disallowing AF_UNIX in this method, breaks backwards 

1199 # compatibility. 

1200 raise ValueError( 

1201 f'A Stream Socket was expected, got {sock!r}') 

1202 

1203 transport, protocol = await self._create_connection_transport( 

1204 sock, protocol_factory, ssl, server_hostname, 

1205 ssl_handshake_timeout=ssl_handshake_timeout, 

1206 ssl_shutdown_timeout=ssl_shutdown_timeout) 

1207 if self._debug: 

1208 # Get the socket from the transport because SSL transport closes 

1209 # the old socket and creates a new SSL socket 

1210 sock = transport.get_extra_info('socket') 

1211 logger.debug("%r connected to %s:%r: (%r, %r)", 

1212 sock, host, port, transport, protocol) 

1213 return transport, protocol 

1214 

1215 async def _create_connection_transport( 

1216 self, sock, protocol_factory, ssl, 

1217 server_hostname, server_side=False, 

1218 ssl_handshake_timeout=None, 

1219 ssl_shutdown_timeout=None, context=None): 

1220 

1221 sock.setblocking(False) 

1222 context = context if context is not None else contextvars.copy_context() 

1223 

1224 protocol = protocol_factory() 

1225 waiter = self.create_future() 

1226 if ssl: 

1227 sslcontext = None if isinstance(ssl, bool) else ssl 

1228 transport = self._make_ssl_transport( 

1229 sock, protocol, sslcontext, waiter, 

1230 server_side=server_side, server_hostname=server_hostname, 

1231 ssl_handshake_timeout=ssl_handshake_timeout, 

1232 ssl_shutdown_timeout=ssl_shutdown_timeout, 

1233 context=context) 

1234 else: 

1235 transport = self._make_socket_transport(sock, protocol, waiter, context=context) 

1236 

1237 try: 

1238 await waiter 

1239 except: 

1240 transport.close() 

1241 raise 

1242 

1243 return transport, protocol 

1244 

1245 async def sendfile(self, transport, file, offset=0, count=None, 

1246 *, fallback=True): 

1247 """Send a file to transport. 

1248 

1249 Return the total number of bytes which were sent. 

1250 

1251 The method uses high-performance os.sendfile if available. 

1252 

1253 file must be a regular file object opened in binary mode. 

1254 

1255 offset tells from where to start reading the file. If specified, 

1256 count is the total number of bytes to transmit as opposed to 

1257 sending the file until EOF is reached. File position is updated on 

1258 return or also in case of error in which case file.tell() 

1259 can be used to figure out the number of bytes 

1260 which were sent. 

1261 

1262 fallback set to True makes asyncio to manually read and send 

1263 the file when the platform does not support the sendfile syscall 

1264 (e.g. Windows or SSL socket on Unix). 

1265 

1266 Raise SendfileNotAvailableError if the system does not support 

1267 sendfile syscall and fallback is False. 

1268 """ 

1269 if transport.is_closing(): 

1270 raise RuntimeError("Transport is closing") 

1271 mode = getattr(transport, '_sendfile_compatible', 

1272 constants._SendfileMode.UNSUPPORTED) 

1273 if mode is constants._SendfileMode.UNSUPPORTED: 

1274 raise RuntimeError( 

1275 f"sendfile is not supported for transport {transport!r}") 

1276 if mode is constants._SendfileMode.TRY_NATIVE: 

1277 try: 

1278 return await self._sendfile_native(transport, file, 

1279 offset, count) 

1280 except exceptions.SendfileNotAvailableError: 

1281 if not fallback: 

1282 raise 

1283 

1284 if not fallback: 

1285 raise RuntimeError( 

1286 f"fallback is disabled and native sendfile is not " 

1287 f"supported for transport {transport!r}") 

1288 

1289 return await self._sendfile_fallback(transport, file, 

1290 offset, count) 

1291 

1292 async def _sendfile_native(self, transp, file, offset, count): 

1293 raise exceptions.SendfileNotAvailableError( 

1294 "sendfile syscall is not supported") 

1295 

1296 async def _sendfile_fallback(self, transp, file, offset, count): 

1297 if offset: 

1298 file.seek(offset) 

1299 blocksize = min(count, 16384) if count else 16384 

1300 buf = bytearray(blocksize) 

1301 total_sent = 0 

1302 proto = _SendfileFallbackProtocol(transp) 

1303 try: 

1304 while True: 

1305 if count: 

1306 blocksize = min(count - total_sent, blocksize) 

1307 if blocksize <= 0: 

1308 return total_sent 

1309 view = memoryview(buf)[:blocksize] 

1310 read = await self.run_in_executor(None, file.readinto, view) 

1311 if not read: 

1312 return total_sent # EOF 

1313 transp.write(view[:read]) 

1314 await proto.drain() 

1315 total_sent += read 

1316 finally: 

1317 if total_sent > 0 and hasattr(file, 'seek'): 

1318 file.seek(offset + total_sent) 

1319 await proto.restore() 

1320 

1321 async def start_tls(self, transport, protocol, sslcontext, *, 

1322 server_side=False, 

1323 server_hostname=None, 

1324 ssl_handshake_timeout=None, 

1325 ssl_shutdown_timeout=None): 

1326 """Upgrade transport to TLS. 

1327 

1328 Return a new transport that *protocol* should start using 

1329 immediately. 

1330 """ 

1331 if ssl is None: 1331 ↛ 1332line 1331 didn't jump to line 1332 because the condition on line 1331 was never true

1332 raise RuntimeError('Python ssl module is not available') 

1333 

1334 if not isinstance(sslcontext, ssl.SSLContext): 

1335 raise TypeError( 

1336 f'sslcontext is expected to be an instance of ssl.SSLContext, ' 

1337 f'got {sslcontext!r}') 

1338 

1339 if not getattr(transport, '_start_tls_compatible', False): 

1340 raise TypeError( 

1341 f'transport {transport!r} is not supported by start_tls()') 

1342 

1343 waiter = self.create_future() 

1344 ssl_protocol = sslproto.SSLProtocol( 

1345 self, protocol, sslcontext, waiter, 

1346 server_side, server_hostname, 

1347 ssl_handshake_timeout=ssl_handshake_timeout, 

1348 ssl_shutdown_timeout=ssl_shutdown_timeout, 

1349 call_connection_made=False) 

1350 

1351 # Pause early so that "ssl_protocol.data_received()" doesn't 

1352 # have a chance to get called before "ssl_protocol.connection_made()". 

1353 transport.pause_reading() 

1354 

1355 # gh-142352: move buffered StreamReader data to SSLProtocol 

1356 if server_side: 

1357 from .streams import StreamReaderProtocol 

1358 if isinstance(protocol, StreamReaderProtocol): 

1359 stream_reader = getattr(protocol, '_stream_reader', None) 

1360 if stream_reader is not None: 1360 ↛ 1366line 1360 didn't jump to line 1366 because the condition on line 1360 was always true

1361 buffer = stream_reader._buffer 

1362 if buffer: 

1363 ssl_protocol._incoming.write(buffer) 

1364 buffer.clear() 

1365 

1366 transport.set_protocol(ssl_protocol) 

1367 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport) 

1368 resume_cb = self.call_soon(transport.resume_reading) 

1369 

1370 try: 

1371 await waiter 

1372 except BaseException: 

1373 transport.close() 

1374 conmade_cb.cancel() 

1375 resume_cb.cancel() 

1376 raise 

1377 

1378 return ssl_protocol._app_transport 

1379 

1380 async def create_datagram_endpoint(self, protocol_factory, 

1381 local_addr=None, remote_addr=None, *, 

1382 family=0, proto=0, flags=0, 

1383 reuse_port=None, 

1384 allow_broadcast=None, sock=None): 

1385 """Create datagram connection.""" 

1386 if sock is not None: 

1387 if sock.type == socket.SOCK_STREAM: 

1388 raise ValueError( 

1389 f'A datagram socket was expected, got {sock!r}') 

1390 if (local_addr or remote_addr or 

1391 family or proto or flags or 

1392 reuse_port or allow_broadcast): 

1393 # show the problematic kwargs in exception msg 

1394 opts = dict(local_addr=local_addr, remote_addr=remote_addr, 

1395 family=family, proto=proto, flags=flags, 

1396 reuse_port=reuse_port, 

1397 allow_broadcast=allow_broadcast) 

1398 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v) 

1399 raise ValueError( 

1400 f'socket modifier keyword arguments can not be used ' 

1401 f'when sock is specified. ({problems})') 

1402 sock.setblocking(False) 

1403 r_addr = None 

1404 else: 

1405 if not (local_addr or remote_addr): 

1406 if family == 0: 

1407 raise ValueError('unexpected address family') 

1408 addr_pairs_info = (((family, proto), (None, None)),) 

1409 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: 

1410 for addr in (local_addr, remote_addr): 

1411 if addr is not None and not isinstance(addr, str): 1411 ↛ 1412line 1411 didn't jump to line 1412 because the condition on line 1411 was never true

1412 raise TypeError('string is expected') 

1413 

1414 if local_addr and local_addr[0] not in (0, '\x00'): 1414 ↛ 1426line 1414 didn't jump to line 1426 because the condition on line 1414 was always true

1415 try: 

1416 if stat.S_ISSOCK(os.stat(local_addr).st_mode): 1416 ↛ 1426line 1416 didn't jump to line 1426 because the condition on line 1416 was always true

1417 os.remove(local_addr) 

1418 except FileNotFoundError: 

1419 pass 

1420 except OSError as err: 

1421 # Directory may have permissions only to create socket. 

1422 logger.error('Unable to check or remove stale UNIX ' 

1423 'socket %r: %r', 

1424 local_addr, err) 

1425 

1426 addr_pairs_info = (((family, proto), 

1427 (local_addr, remote_addr)), ) 

1428 else: 

1429 # join address by (family, protocol) 

1430 addr_infos = {} # Using order preserving dict 

1431 for idx, addr in ((0, local_addr), (1, remote_addr)): 

1432 if addr is not None: 

1433 if not (isinstance(addr, tuple) and len(addr) == 2): 

1434 raise TypeError('2-tuple is expected') 

1435 

1436 infos = await self._ensure_resolved( 

1437 addr, family=family, type=socket.SOCK_DGRAM, 

1438 proto=proto, flags=flags, loop=self) 

1439 if not infos: 

1440 raise OSError('getaddrinfo() returned empty list') 

1441 

1442 for fam, _, pro, _, address in infos: 

1443 key = (fam, pro) 

1444 if key not in addr_infos: 1444 ↛ 1446line 1444 didn't jump to line 1446 because the condition on line 1444 was always true

1445 addr_infos[key] = [None, None] 

1446 addr_infos[key][idx] = address 

1447 

1448 # each addr has to have info for each (family, proto) pair 

1449 addr_pairs_info = [ 

1450 (key, addr_pair) for key, addr_pair in addr_infos.items() 

1451 if not ((local_addr and addr_pair[0] is None) or 

1452 (remote_addr and addr_pair[1] is None))] 

1453 

1454 if not addr_pairs_info: 

1455 raise ValueError('can not get address information') 

1456 

1457 exceptions = [] 

1458 

1459 for ((family, proto), 

1460 (local_address, remote_address)) in addr_pairs_info: 

1461 sock = None 

1462 r_addr = None 

1463 try: 

1464 sock = socket.socket( 

1465 family=family, type=socket.SOCK_DGRAM, proto=proto) 

1466 if reuse_port: 

1467 _set_reuseport(sock) 

1468 if allow_broadcast: 

1469 sock.setsockopt( 

1470 socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 

1471 sock.setblocking(False) 

1472 

1473 if local_addr: 

1474 sock.bind(local_address) 

1475 if remote_addr: 

1476 if not allow_broadcast: 

1477 await self.sock_connect(sock, remote_address) 

1478 r_addr = remote_address 

1479 except OSError as exc: 

1480 if sock is not None: 

1481 sock.close() 

1482 exceptions.append(exc) 

1483 except: 

1484 if sock is not None: 1484 ↛ 1486line 1484 didn't jump to line 1486 because the condition on line 1484 was always true

1485 sock.close() 

1486 raise 

1487 else: 

1488 break 

1489 else: 

1490 raise exceptions[0] 

1491 

1492 protocol = protocol_factory() 

1493 waiter = self.create_future() 

1494 transport = self._make_datagram_transport( 

1495 sock, protocol, r_addr, waiter) 

1496 if self._debug: 1496 ↛ 1497line 1496 didn't jump to line 1497 because the condition on line 1496 was never true

1497 if local_addr: 

1498 logger.info("Datagram endpoint local_addr=%r remote_addr=%r " 

1499 "created: (%r, %r)", 

1500 local_addr, remote_addr, transport, protocol) 

1501 else: 

1502 logger.debug("Datagram endpoint remote_addr=%r created: " 

1503 "(%r, %r)", 

1504 remote_addr, transport, protocol) 

1505 

1506 try: 

1507 await waiter 

1508 except: 

1509 transport.close() 

1510 raise 

1511 

1512 return transport, protocol 

1513 

1514 async def _ensure_resolved(self, address, *, 

1515 family=0, type=socket.SOCK_STREAM, 

1516 proto=0, flags=0, loop): 

1517 host, port = address[:2] 

1518 info = _ipaddr_info(host, port, family, type, proto, *address[2:]) 

1519 if info is not None: 

1520 # "host" is already a resolved IP. 

1521 return [info] 

1522 else: 

1523 return await loop.getaddrinfo(host, port, family=family, type=type, 

1524 proto=proto, flags=flags) 

1525 

1526 async def _create_server_getaddrinfo(self, host, port, family, flags): 

1527 infos = await self._ensure_resolved((host, port), family=family, 

1528 type=socket.SOCK_STREAM, 

1529 flags=flags, loop=self) 

1530 if not infos: 

1531 raise OSError(f'getaddrinfo({host!r}) returned empty list') 

1532 return infos 

1533 

1534 async def create_server( 

1535 self, protocol_factory, host=None, port=None, 

1536 *, 

1537 family=socket.AF_UNSPEC, 

1538 flags=socket.AI_PASSIVE, 

1539 sock=None, 

1540 backlog=100, 

1541 ssl=None, 

1542 reuse_address=None, 

1543 reuse_port=None, 

1544 keep_alive=None, 

1545 ssl_handshake_timeout=None, 

1546 ssl_shutdown_timeout=None, 

1547 start_serving=True): 

1548 """Create a TCP server. 

1549 

1550 The host parameter can be a string, in that case the TCP server is 

1551 bound to host and port. 

1552 

1553 The host parameter can also be a sequence of strings and in that case 

1554 the TCP server is bound to all hosts of the sequence. If a host 

1555 appears multiple times (possibly indirectly e.g. when hostnames 

1556 resolve to the same IP address), the server is only bound once to that 

1557 host. 

1558 

1559 Return a Server object which can be used to stop the service. 

1560 

1561 This method is a coroutine. 

1562 """ 

1563 if isinstance(ssl, bool): 1563 ↛ 1564line 1563 didn't jump to line 1564 because the condition on line 1563 was never true

1564 raise TypeError('ssl argument must be an SSLContext or None') 

1565 

1566 if ssl_handshake_timeout is not None and ssl is None: 

1567 raise ValueError( 

1568 'ssl_handshake_timeout is only meaningful with ssl') 

1569 

1570 if ssl_shutdown_timeout is not None and ssl is None: 1570 ↛ 1571line 1570 didn't jump to line 1571 because the condition on line 1570 was never true

1571 raise ValueError( 

1572 'ssl_shutdown_timeout is only meaningful with ssl') 

1573 

1574 if sock is not None: 

1575 _check_ssl_socket(sock) 

1576 

1577 if host is not None or port is not None: 

1578 if sock is not None: 

1579 raise ValueError( 

1580 'host/port and sock can not be specified at the same time') 

1581 

1582 if reuse_address is None: 1582 ↛ 1584line 1582 didn't jump to line 1584 because the condition on line 1582 was always true

1583 reuse_address = os.name == "posix" and sys.platform != "cygwin" 

1584 sockets = [] 

1585 if host == '': 

1586 hosts = [None] 

1587 elif (isinstance(host, str) or 

1588 not isinstance(host, collections.abc.Iterable)): 

1589 hosts = [host] 

1590 else: 

1591 hosts = host 

1592 

1593 fs = [self._create_server_getaddrinfo(host, port, family=family, 

1594 flags=flags) 

1595 for host in hosts] 

1596 infos = await tasks.gather(*fs) 

1597 infos = set(itertools.chain.from_iterable(infos)) 

1598 

1599 completed = False 

1600 try: 

1601 for res in infos: 

1602 af, socktype, proto, canonname, sa = res 

1603 try: 

1604 sock = socket.socket(af, socktype, proto) 

1605 except socket.error: 

1606 # Assume it's a bad family/type/protocol combination. 

1607 if self._debug: 

1608 logger.warning('create_server() failed to create ' 

1609 'socket.socket(%r, %r, %r)', 

1610 af, socktype, proto, exc_info=True) 

1611 continue 

1612 sockets.append(sock) 

1613 if reuse_address: 1613 ↛ 1618line 1613 didn't jump to line 1618 because the condition on line 1613 was always true

1614 sock.setsockopt( 

1615 socket.SOL_SOCKET, socket.SO_REUSEADDR, True) 

1616 # Since Linux 6.12.9, SO_REUSEPORT is not allowed 

1617 # on other address families than AF_INET/AF_INET6. 

1618 if reuse_port and af in (socket.AF_INET, socket.AF_INET6): 

1619 _set_reuseport(sock) 

1620 if keep_alive: 1620 ↛ 1621line 1620 didn't jump to line 1621 because the condition on line 1620 was never true

1621 sock.setsockopt( 

1622 socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) 

1623 # Disable IPv4/IPv6 dual stack support (enabled by 

1624 # default on Linux) which makes a single socket 

1625 # listen on both address families. 

1626 if (_HAS_IPv6 and 

1627 af == socket.AF_INET6 and 

1628 hasattr(socket, 'IPPROTO_IPV6')): 

1629 sock.setsockopt(socket.IPPROTO_IPV6, 

1630 socket.IPV6_V6ONLY, 

1631 True) 

1632 try: 

1633 sock.bind(sa) 

1634 except OSError as err: 

1635 msg = ('error while attempting ' 

1636 'to bind on address %r: %s' 

1637 % (sa, str(err).lower())) 

1638 if err.errno == errno.EADDRNOTAVAIL: 1638 ↛ 1640line 1638 didn't jump to line 1640 because the condition on line 1638 was never true

1639 # Assume the family is not enabled (bpo-30945) 

1640 sockets.pop() 

1641 sock.close() 

1642 if self._debug: 

1643 logger.warning(msg) 

1644 continue 

1645 raise OSError(err.errno, msg) from None 

1646 

1647 if not sockets: 1647 ↛ 1648line 1647 didn't jump to line 1648 because the condition on line 1647 was never true

1648 raise OSError('could not bind on any address out of %r' 

1649 % ([info[4] for info in infos],)) 

1650 

1651 completed = True 

1652 finally: 

1653 if not completed: 

1654 for sock in sockets: 1654 ↛ 1663line 1654 didn't jump to line 1663 because the loop on line 1654 didn't complete

1655 sock.close() 

1656 else: 

1657 if sock is None: 

1658 raise ValueError('Neither host/port nor sock were specified') 

1659 if sock.type != socket.SOCK_STREAM: 

1660 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 

1661 sockets = [sock] 

1662 

1663 for sock in sockets: 

1664 sock.setblocking(False) 

1665 

1666 server = Server(self, sockets, protocol_factory, 

1667 ssl, backlog, ssl_handshake_timeout, 

1668 ssl_shutdown_timeout) 

1669 if start_serving: 

1670 server._start_serving() 

1671 # Skip one loop iteration so that all 'loop.add_reader' 

1672 # go through. 

1673 await tasks.sleep(0) 

1674 

1675 if self._debug: 

1676 logger.info("%r is serving", server) 

1677 return server 

1678 

1679 async def connect_accepted_socket( 

1680 self, protocol_factory, sock, 

1681 *, ssl=None, 

1682 ssl_handshake_timeout=None, 

1683 ssl_shutdown_timeout=None): 

1684 if sock.type != socket.SOCK_STREAM: 1684 ↛ 1685line 1684 didn't jump to line 1685 because the condition on line 1684 was never true

1685 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 

1686 

1687 if ssl_handshake_timeout is not None and not ssl: 

1688 raise ValueError( 

1689 'ssl_handshake_timeout is only meaningful with ssl') 

1690 

1691 if ssl_shutdown_timeout is not None and not ssl: 1691 ↛ 1692line 1691 didn't jump to line 1692 because the condition on line 1691 was never true

1692 raise ValueError( 

1693 'ssl_shutdown_timeout is only meaningful with ssl') 

1694 

1695 _check_ssl_socket(sock) 

1696 

1697 transport, protocol = await self._create_connection_transport( 

1698 sock, protocol_factory, ssl, '', server_side=True, 

1699 ssl_handshake_timeout=ssl_handshake_timeout, 

1700 ssl_shutdown_timeout=ssl_shutdown_timeout) 

1701 if self._debug: 1701 ↛ 1704line 1701 didn't jump to line 1704 because the condition on line 1701 was never true

1702 # Get the socket from the transport because SSL transport closes 

1703 # the old socket and creates a new SSL socket 

1704 sock = transport.get_extra_info('socket') 

1705 logger.debug("%r handled: (%r, %r)", sock, transport, protocol) 

1706 return transport, protocol 

1707 

1708 async def connect_read_pipe(self, protocol_factory, pipe): 

1709 protocol = protocol_factory() 

1710 waiter = self.create_future() 

1711 transport = self._make_read_pipe_transport(pipe, protocol, waiter) 

1712 

1713 try: 

1714 await waiter 

1715 except: 

1716 transport.close() 

1717 raise 

1718 

1719 if self._debug: 1719 ↛ 1720line 1719 didn't jump to line 1720 because the condition on line 1719 was never true

1720 logger.debug('Read pipe %r connected: (%r, %r)', 

1721 pipe.fileno(), transport, protocol) 

1722 return transport, protocol 

1723 

1724 async def connect_write_pipe(self, protocol_factory, pipe): 

1725 protocol = protocol_factory() 

1726 waiter = self.create_future() 

1727 transport = self._make_write_pipe_transport(pipe, protocol, waiter) 

1728 

1729 try: 

1730 await waiter 

1731 except: 

1732 transport.close() 

1733 raise 

1734 

1735 if self._debug: 1735 ↛ 1736line 1735 didn't jump to line 1736 because the condition on line 1735 was never true

1736 logger.debug('Write pipe %r connected: (%r, %r)', 

1737 pipe.fileno(), transport, protocol) 

1738 return transport, protocol 

1739 

1740 def _log_subprocess(self, msg, stdin, stdout, stderr): 

1741 info = [msg] 

1742 if stdin is not None: 

1743 info.append(f'stdin={_format_pipe(stdin)}') 

1744 if stdout is not None and stderr == subprocess.STDOUT: 

1745 info.append(f'stdout=stderr={_format_pipe(stdout)}') 

1746 else: 

1747 if stdout is not None: 

1748 info.append(f'stdout={_format_pipe(stdout)}') 

1749 if stderr is not None: 

1750 info.append(f'stderr={_format_pipe(stderr)}') 

1751 logger.debug(' '.join(info)) 

1752 

1753 async def subprocess_shell(self, protocol_factory, cmd, *, 

1754 stdin=subprocess.PIPE, 

1755 stdout=subprocess.PIPE, 

1756 stderr=subprocess.PIPE, 

1757 universal_newlines=False, 

1758 shell=True, bufsize=0, 

1759 encoding=None, errors=None, text=None, 

1760 **kwargs): 

1761 if not isinstance(cmd, (bytes, str)): 

1762 raise ValueError("cmd must be a string") 

1763 if universal_newlines: 

1764 raise ValueError("universal_newlines must be False") 

1765 if not shell: 

1766 raise ValueError("shell must be True") 

1767 if bufsize != 0: 

1768 raise ValueError("bufsize must be 0") 

1769 if text: 

1770 raise ValueError("text must be False") 

1771 if encoding is not None: 

1772 raise ValueError("encoding must be None") 

1773 if errors is not None: 

1774 raise ValueError("errors must be None") 

1775 

1776 protocol = protocol_factory() 

1777 debug_log = None 

1778 if self._debug: 1778 ↛ 1781line 1778 didn't jump to line 1781 because the condition on line 1778 was never true

1779 # don't log parameters: they may contain sensitive information 

1780 # (password) and may be too long 

1781 debug_log = 'run shell command %r' % cmd 

1782 self._log_subprocess(debug_log, stdin, stdout, stderr) 

1783 transport = await self._make_subprocess_transport( 

1784 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) 

1785 if self._debug and debug_log is not None: 1785 ↛ 1786line 1785 didn't jump to line 1786 because the condition on line 1785 was never true

1786 logger.info('%s: %r', debug_log, transport) 

1787 return transport, protocol 

1788 

1789 async def subprocess_exec(self, protocol_factory, program, *args, 

1790 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 

1791 stderr=subprocess.PIPE, universal_newlines=False, 

1792 shell=False, bufsize=0, 

1793 encoding=None, errors=None, text=None, 

1794 **kwargs): 

1795 if universal_newlines: 

1796 raise ValueError("universal_newlines must be False") 

1797 if shell: 

1798 raise ValueError("shell must be False") 

1799 if bufsize != 0: 

1800 raise ValueError("bufsize must be 0") 

1801 if text: 

1802 raise ValueError("text must be False") 

1803 if encoding is not None: 

1804 raise ValueError("encoding must be None") 

1805 if errors is not None: 

1806 raise ValueError("errors must be None") 

1807 

1808 popen_args = (program,) + args 

1809 protocol = protocol_factory() 

1810 debug_log = None 

1811 if self._debug: 1811 ↛ 1814line 1811 didn't jump to line 1814 because the condition on line 1811 was never true

1812 # don't log parameters: they may contain sensitive information 

1813 # (password) and may be too long 

1814 debug_log = f'execute program {program!r}' 

1815 self._log_subprocess(debug_log, stdin, stdout, stderr) 

1816 transport = await self._make_subprocess_transport( 

1817 protocol, popen_args, False, stdin, stdout, stderr, 

1818 bufsize, **kwargs) 

1819 if self._debug and debug_log is not None: 1819 ↛ 1820line 1819 didn't jump to line 1820 because the condition on line 1819 was never true

1820 logger.info('%s: %r', debug_log, transport) 

1821 return transport, protocol 

1822 

1823 def get_exception_handler(self): 

1824 """Return an exception handler, or None if the default one is in use. 

1825 """ 

1826 return self._exception_handler 

1827 

1828 def set_exception_handler(self, handler): 

1829 """Set handler as the new event loop exception handler. 

1830 

1831 If handler is None, the default exception handler will 

1832 be set. 

1833 

1834 If handler is a callable object, it should have a 

1835 signature matching '(loop, context)', where 'loop' 

1836 will be a reference to the active event loop, 'context' 

1837 will be a dict object (see `call_exception_handler()` 

1838 documentation for details about context). 

1839 """ 

1840 if handler is not None and not callable(handler): 

1841 raise TypeError(f'A callable object or None is expected, ' 

1842 f'got {handler!r}') 

1843 self._exception_handler = handler 

1844 

1845 def default_exception_handler(self, context): 

1846 """Default exception handler. 

1847 

1848 This is called when an exception occurs and no exception 

1849 handler is set, and can be called by a custom exception 

1850 handler that wants to defer to the default behavior. 

1851 

1852 This default handler logs the error message and other 

1853 context-dependent information. In debug mode, a truncated 

1854 stack trace is also appended showing where the given object 

1855 (e.g. a handle or future or task) was created, if any. 

1856 

1857 The context parameter has the same meaning as in 

1858 `call_exception_handler()`. 

1859 """ 

1860 message = context.get('message') 

1861 if not message: 1861 ↛ 1862line 1861 didn't jump to line 1862 because the condition on line 1861 was never true

1862 message = 'Unhandled exception in event loop' 

1863 

1864 exception = context.get('exception') 

1865 if exception is not None: 

1866 exc_info = (type(exception), exception, exception.__traceback__) 

1867 else: 

1868 exc_info = False 

1869 

1870 if ('source_traceback' not in context and 1870 ↛ 1873line 1870 didn't jump to line 1873 because the condition on line 1870 was never true

1871 self._current_handle is not None and 

1872 self._current_handle._source_traceback): 

1873 context['handle_traceback'] = \ 

1874 self._current_handle._source_traceback 

1875 

1876 log_lines = [message] 

1877 for key in sorted(context): 

1878 if key in {'message', 'exception'}: 

1879 continue 

1880 value = context[key] 

1881 if key == 'source_traceback': 

1882 tb = ''.join(traceback.format_list(value)) 

1883 value = 'Object created at (most recent call last):\n' 

1884 value += tb.rstrip() 

1885 elif key == 'handle_traceback': 1885 ↛ 1886line 1885 didn't jump to line 1886 because the condition on line 1885 was never true

1886 tb = ''.join(traceback.format_list(value)) 

1887 value = 'Handle created at (most recent call last):\n' 

1888 value += tb.rstrip() 

1889 else: 

1890 value = repr(value) 

1891 log_lines.append(f'{key}: {value}') 

1892 

1893 logger.error('\n'.join(log_lines), exc_info=exc_info) 

1894 

1895 def call_exception_handler(self, context): 

1896 """Call the current event loop's exception handler. 

1897 

1898 The context argument is a dict containing the following keys: 

1899 

1900 - 'message': Error message; 

1901 - 'exception' (optional): Exception object; 

1902 - 'future' (optional): Future instance; 

1903 - 'task' (optional): Task instance; 

1904 - 'handle' (optional): Handle instance; 

1905 - 'protocol' (optional): Protocol instance; 

1906 - 'transport' (optional): Transport instance; 

1907 - 'socket' (optional): Socket instance; 

1908 - 'source_traceback' (optional): Traceback of the source; 

1909 - 'handle_traceback' (optional): Traceback of the handle; 

1910 - 'asyncgen' (optional): Asynchronous generator that caused 

1911 the exception. 

1912 

1913 New keys maybe introduced in the future. 

1914 

1915 Note: do not overload this method in an event loop subclass. 

1916 For custom exception handling, use the 

1917 `set_exception_handler()` method. 

1918 """ 

1919 if self._exception_handler is None: 

1920 try: 

1921 self.default_exception_handler(context) 

1922 except (SystemExit, KeyboardInterrupt): 

1923 raise 

1924 except BaseException: 

1925 # Second protection layer for unexpected errors 

1926 # in the default implementation, as well as for subclassed 

1927 # event loops with overloaded "default_exception_handler". 

1928 logger.error('Exception in default exception handler', 

1929 exc_info=True) 

1930 else: 

1931 try: 

1932 ctx = None 

1933 thing = context.get("task") 

1934 if thing is None: 

1935 # Even though Futures don't have a context, 

1936 # Task is a subclass of Future, 

1937 # and sometimes the 'future' key holds a Task. 

1938 thing = context.get("future") 

1939 if thing is None: 

1940 # Handles also have a context. 

1941 thing = context.get("handle") 

1942 if thing is not None and hasattr(thing, "get_context"): 

1943 ctx = thing.get_context() 

1944 if ctx is not None and hasattr(ctx, "run"): 

1945 ctx.run(self._exception_handler, self, context) 

1946 else: 

1947 self._exception_handler(self, context) 

1948 except (SystemExit, KeyboardInterrupt): 

1949 raise 

1950 except BaseException as exc: 

1951 # Exception in the user set custom exception handler. 

1952 try: 

1953 # Let's try default handler. 

1954 self.default_exception_handler({ 

1955 'message': 'Unhandled error in exception handler', 

1956 'exception': exc, 

1957 'context': context, 

1958 }) 

1959 except (SystemExit, KeyboardInterrupt): 

1960 raise 

1961 except BaseException: 

1962 # Guard 'default_exception_handler' in case it is 

1963 # overloaded. 

1964 logger.error('Exception in default exception handler ' 

1965 'while handling an unexpected error ' 

1966 'in custom exception handler', 

1967 exc_info=True) 

1968 

1969 def _add_callback(self, handle): 

1970 """Add a Handle to _ready.""" 

1971 if not handle._cancelled: 

1972 self._ready.append(handle) 

1973 

1974 def _add_callback_signalsafe(self, handle): 

1975 """Like _add_callback() but called from a signal handler.""" 

1976 self._add_callback(handle) 

1977 self._write_to_self() 

1978 

1979 def _timer_handle_cancelled(self, handle): 

1980 """Notification that a TimerHandle has been cancelled.""" 

1981 if handle._scheduled: 

1982 self._timer_cancelled_count += 1 

1983 

1984 def _run_once(self): 

1985 """Run one full iteration of the event loop. 

1986 

1987 This calls all currently ready callbacks, polls for I/O, 

1988 schedules the resulting callbacks, and finally schedules 

1989 'call_later' callbacks. 

1990 """ 

1991 

1992 sched_count = len(self._scheduled) 

1993 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and 

1994 self._timer_cancelled_count / sched_count > 

1995 _MIN_CANCELLED_TIMER_HANDLES_FRACTION): 

1996 # Remove delayed calls that were cancelled if their number 

1997 # is too high 

1998 new_scheduled = [] 

1999 for handle in self._scheduled: 

2000 if handle._cancelled: 

2001 handle._scheduled = False 

2002 else: 

2003 new_scheduled.append(handle) 

2004 

2005 heapq.heapify(new_scheduled) 

2006 self._scheduled = new_scheduled 

2007 self._timer_cancelled_count = 0 

2008 else: 

2009 # Remove delayed calls that were cancelled from head of queue. 

2010 while self._scheduled and self._scheduled[0]._cancelled: 

2011 self._timer_cancelled_count -= 1 

2012 handle = heapq.heappop(self._scheduled) 

2013 handle._scheduled = False 

2014 

2015 timeout = None 

2016 if self._ready or self._stopping: 

2017 timeout = 0 

2018 elif self._scheduled: 

2019 # Compute the desired timeout. 

2020 timeout = self._scheduled[0]._when - self.time() 

2021 if timeout > MAXIMUM_SELECT_TIMEOUT: 2021 ↛ 2022line 2021 didn't jump to line 2022 because the condition on line 2021 was never true

2022 timeout = MAXIMUM_SELECT_TIMEOUT 

2023 elif timeout < 0: 

2024 timeout = 0 

2025 

2026 event_list = self._selector.select(timeout) 

2027 self._process_events(event_list) 

2028 # Needed to break cycles when an exception occurs. 

2029 event_list = None 

2030 

2031 # Handle 'later' callbacks that are ready. 

2032 now = self.time() 

2033 # Ensure that `end_time` is strictly increasing 

2034 # when the clock resolution is too small. 

2035 end_time = now + max(self._clock_resolution, math.ulp(now)) 

2036 while self._scheduled: 

2037 handle = self._scheduled[0] 

2038 if handle._when >= end_time: 

2039 break 

2040 handle = heapq.heappop(self._scheduled) 

2041 handle._scheduled = False 

2042 self._ready.append(handle) 

2043 

2044 # This is the only place where callbacks are actually *called*. 

2045 # All other places just add them to ready. 

2046 # Note: We run all currently scheduled callbacks, but not any 

2047 # callbacks scheduled by callbacks run this time around -- 

2048 # they will be run the next time (after another I/O poll). 

2049 # Use an idiom that is thread-safe without using locks. 

2050 ntodo = len(self._ready) 

2051 for i in range(ntodo): 

2052 handle = self._ready.popleft() 

2053 if handle._cancelled: 

2054 continue 

2055 if self._debug: 

2056 try: 

2057 self._current_handle = handle 

2058 t0 = self.time() 

2059 handle._run() 

2060 dt = self.time() - t0 

2061 if dt >= self.slow_callback_duration: 

2062 logger.warning('Executing %s took %.3f seconds', 

2063 _format_handle(handle), dt) 

2064 finally: 

2065 self._current_handle = None 

2066 else: 

2067 handle._run() 

2068 handle = None # Needed to break cycles when an exception occurs. 

2069 

2070 def _set_coroutine_origin_tracking(self, enabled): 

2071 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled): 

2072 return 

2073 

2074 if enabled: 

2075 self._coroutine_origin_tracking_saved_depth = ( 

2076 sys.get_coroutine_origin_tracking_depth()) 

2077 sys.set_coroutine_origin_tracking_depth( 

2078 constants.DEBUG_STACK_DEPTH) 

2079 else: 

2080 sys.set_coroutine_origin_tracking_depth( 

2081 self._coroutine_origin_tracking_saved_depth) 

2082 

2083 self._coroutine_origin_tracking_enabled = enabled 

2084 

2085 def get_debug(self): 

2086 return self._debug 

2087 

2088 def set_debug(self, enabled): 

2089 self._debug = enabled 

2090 

2091 if self.is_running(): 

2092 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)