Coverage for Lib/asyncio/base_events.py: 88%

1152 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-23 01:21 +0000

1"""Base implementation of event loop. 

2 

3The event loop can be broken up into a multiplexer (the part 

4responsible for notifying us of I/O events) and the event loop proper, 

5which wraps a multiplexer with functionality for scheduling callbacks, 

6immediately or at a given time in the future. 

7 

8Whenever a public API takes a callback, subsequent positional 

9arguments will be passed to the callback if/when it is called. This 

10avoids the proliferation of trivial lambdas implementing closures. 

11Keyword arguments for the callback are not supported; this is a 

12conscious design decision, leaving the door open for keyword arguments 

13to modify the meaning of the API call itself. 

14""" 

15 

16import collections 

17import collections.abc 

18import concurrent.futures 

19import errno 

20import heapq 

21import itertools 

22import os 

23import socket 

24import stat 

25import subprocess 

26import threading 

27import time 

28import traceback 

29import sys 

30import warnings 

31import weakref 

32 

33try: 

34 import ssl 

35except ImportError: # pragma: no cover 

36 ssl = None 

37 

38from . import constants 

39from . import coroutines 

40from . import events 

41from . import exceptions 

42from . import futures 

43from . import protocols 

44from . import sslproto 

45from . import staggered 

46from . import tasks 

47from . import timeouts 

48from . import transports 

49from . import trsock 

50from .log import logger 

51 

52 

53__all__ = 'BaseEventLoop','Server', 

54 

55 

56# Minimum number of _scheduled timer handles before cleanup of 

57# cancelled handles is performed. 

58_MIN_SCHEDULED_TIMER_HANDLES = 100 

59 

60# Minimum fraction of _scheduled timer handles that are cancelled 

61# before cleanup of cancelled handles is performed. 

62_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 

63 

64 

65_HAS_IPv6 = hasattr(socket, 'AF_INET6') 

66 

67# Maximum timeout passed to select to avoid OS limitations 

68MAXIMUM_SELECT_TIMEOUT = 24 * 3600 

69 

70 

71def _format_handle(handle): 

72 cb = handle._callback 

73 if isinstance(getattr(cb, '__self__', None), tasks.Task): 

74 # format the task 

75 return repr(cb.__self__) 

76 else: 

77 return str(handle) 

78 

79 

80def _format_pipe(fd): 

81 if fd == subprocess.PIPE: 

82 return '<pipe>' 

83 elif fd == subprocess.STDOUT: 

84 return '<stdout>' 

85 else: 

86 return repr(fd) 

87 

88 

89def _set_reuseport(sock): 

90 if not hasattr(socket, 'SO_REUSEPORT'): 

91 raise ValueError('reuse_port not supported by socket module') 

92 else: 

93 try: 

94 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 

95 except OSError: 

96 raise ValueError('reuse_port not supported by socket module, ' 

97 'SO_REUSEPORT defined but not implemented.') 

98 

99 

100def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0): 

101 # Try to skip getaddrinfo if "host" is already an IP. Users might have 

102 # handled name resolution in their own code and pass in resolved IPs. 

103 if not hasattr(socket, 'inet_pton'): 

104 return 

105 

106 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \ 

107 host is None: 

108 return None 

109 

110 if type == socket.SOCK_STREAM: 

111 proto = socket.IPPROTO_TCP 

112 elif type == socket.SOCK_DGRAM: 

113 proto = socket.IPPROTO_UDP 

114 else: 

115 return None 

116 

117 if port is None: 

118 port = 0 

119 elif isinstance(port, bytes) and port == b'': 

120 port = 0 

121 elif isinstance(port, str) and port == '': 

122 port = 0 

123 else: 

124 # If port's a service name like "http", don't skip getaddrinfo. 

125 try: 

126 port = int(port) 

127 except (TypeError, ValueError): 

128 return None 

129 

130 if family == socket.AF_UNSPEC: 

131 afs = [socket.AF_INET] 

132 if _HAS_IPv6: 132 ↛ 137line 132 didn't jump to line 137 because the condition on line 132 was always true

133 afs.append(socket.AF_INET6) 

134 else: 

135 afs = [family] 

136 

137 if isinstance(host, bytes): 

138 host = host.decode('idna') 

139 if '%' in host: 

140 # Linux's inet_pton doesn't accept an IPv6 zone index after host, 

141 # like '::1%lo0'. 

142 return None 

143 

144 for af in afs: 

145 try: 

146 socket.inet_pton(af, host) 

147 # The host has already been resolved. 

148 if _HAS_IPv6 and af == socket.AF_INET6: 

149 return af, type, proto, '', (host, port, flowinfo, scopeid) 

150 else: 

151 return af, type, proto, '', (host, port) 

152 except OSError: 

153 pass 

154 

155 # "host" is not an IP address. 

156 return None 

157 

158 

159def _interleave_addrinfos(addrinfos, first_address_family_count=1): 

160 """Interleave list of addrinfo tuples by family.""" 

161 # Group addresses by family 

162 addrinfos_by_family = collections.OrderedDict() 

163 for addr in addrinfos: 

164 family = addr[0] 

165 if family not in addrinfos_by_family: 

166 addrinfos_by_family[family] = [] 

167 addrinfos_by_family[family].append(addr) 

168 addrinfos_lists = list(addrinfos_by_family.values()) 

169 

170 reordered = [] 

171 if first_address_family_count > 1: 

172 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1]) 

173 del addrinfos_lists[0][:first_address_family_count - 1] 

174 reordered.extend( 

175 a for a in itertools.chain.from_iterable( 

176 itertools.zip_longest(*addrinfos_lists) 

177 ) if a is not None) 

178 return reordered 

179 

180 

181def _run_until_complete_cb(fut): 

182 if not fut.cancelled(): 

183 exc = fut.exception() 

184 if isinstance(exc, (SystemExit, KeyboardInterrupt)): 

185 # Issue #22429: run_forever() already finished, no need to 

186 # stop it. 

187 return 

188 futures._get_loop(fut).stop() 

189 

190 

191if hasattr(socket, 'TCP_NODELAY'): 191 ↛ 198line 191 didn't jump to line 198 because the condition on line 191 was always true

192 def _set_nodelay(sock): 

193 if (sock.family in {socket.AF_INET, socket.AF_INET6} and 

194 sock.type == socket.SOCK_STREAM and 

195 sock.proto == socket.IPPROTO_TCP): 

196 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 

197else: 

198 def _set_nodelay(sock): 

199 pass 

200 

201 

202def _check_ssl_socket(sock): 

203 if ssl is not None and isinstance(sock, ssl.SSLSocket): 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true

204 raise TypeError("Socket cannot be of type SSLSocket") 

205 

206 

207class _SendfileFallbackProtocol(protocols.Protocol): 

208 def __init__(self, transp): 

209 if not isinstance(transp, transports._FlowControlMixin): 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true

210 raise TypeError("transport should be _FlowControlMixin instance") 

211 self._transport = transp 

212 self._proto = transp.get_protocol() 

213 self._should_resume_reading = transp.is_reading() 

214 self._should_resume_writing = transp._protocol_paused 

215 transp.pause_reading() 

216 transp.set_protocol(self) 

217 if self._should_resume_writing: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 self._write_ready_fut = self._transport._loop.create_future() 

219 else: 

220 self._write_ready_fut = None 

221 

222 async def drain(self): 

223 if self._transport.is_closing(): 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true

224 raise ConnectionError("Connection closed by peer") 

225 fut = self._write_ready_fut 

226 if fut is None: 

227 return 

228 await fut 

229 

230 def connection_made(self, transport): 

231 raise RuntimeError("Invalid state: " 

232 "connection should have been established already.") 

233 

234 def connection_lost(self, exc): 

235 if self._write_ready_fut is not None: 235 ↛ 243line 235 didn't jump to line 243 because the condition on line 235 was always true

236 # Never happens if peer disconnects after sending the whole content 

237 # Thus disconnection is always an exception from user perspective 

238 if exc is None: 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true

239 self._write_ready_fut.set_exception( 

240 ConnectionError("Connection is closed by peer")) 

241 else: 

242 self._write_ready_fut.set_exception(exc) 

243 self._proto.connection_lost(exc) 

244 

245 def pause_writing(self): 

246 if self._write_ready_fut is not None: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true

247 return 

248 self._write_ready_fut = self._transport._loop.create_future() 

249 

250 def resume_writing(self): 

251 if self._write_ready_fut is None: 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true

252 return 

253 self._write_ready_fut.set_result(False) 

254 self._write_ready_fut = None 

255 

256 def data_received(self, data): 

257 raise RuntimeError("Invalid state: reading should be paused") 

258 

259 def eof_received(self): 

260 raise RuntimeError("Invalid state: reading should be paused") 

261 

262 async def restore(self): 

263 self._transport.set_protocol(self._proto) 

264 if self._should_resume_reading: 264 ↛ 266line 264 didn't jump to line 266 because the condition on line 264 was always true

265 self._transport.resume_reading() 

266 if self._write_ready_fut is not None: 

267 # Cancel the future. 

268 # Basically it has no effect because protocol is switched back, 

269 # no code should wait for it anymore. 

270 self._write_ready_fut.cancel() 

271 if self._should_resume_writing: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true

272 self._proto.resume_writing() 

273 

274 

275class Server(events.AbstractServer): 

276 

277 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, 

278 ssl_handshake_timeout, ssl_shutdown_timeout=None): 

279 self._loop = loop 

280 self._sockets = sockets 

281 # Weak references so we don't break Transport's ability to 

282 # detect abandoned transports 

283 self._clients = weakref.WeakSet() 

284 self._waiters = [] 

285 self._protocol_factory = protocol_factory 

286 self._backlog = backlog 

287 self._ssl_context = ssl_context 

288 self._ssl_handshake_timeout = ssl_handshake_timeout 

289 self._ssl_shutdown_timeout = ssl_shutdown_timeout 

290 self._serving = False 

291 self._serving_forever_fut = None 

292 

293 def __repr__(self): 

294 return f'<{self.__class__.__name__} sockets={self.sockets!r}>' 

295 

296 def _attach(self, transport): 

297 assert self._sockets is not None 

298 self._clients.add(transport) 

299 

300 def _detach(self, transport): 

301 self._clients.discard(transport) 

302 if len(self._clients) == 0 and self._sockets is None: 

303 self._wakeup() 

304 

305 def _wakeup(self): 

306 waiters = self._waiters 

307 self._waiters = None 

308 for waiter in waiters: 

309 if not waiter.done(): 309 ↛ 308line 309 didn't jump to line 308 because the condition on line 309 was always true

310 waiter.set_result(None) 

311 

312 def _start_serving(self): 

313 if self._serving: 

314 return 

315 self._serving = True 

316 for sock in self._sockets: 

317 sock.listen(self._backlog) 

318 self._loop._start_serving( 

319 self._protocol_factory, sock, self._ssl_context, 

320 self, self._backlog, self._ssl_handshake_timeout, 

321 self._ssl_shutdown_timeout) 

322 

323 def get_loop(self): 

324 return self._loop 

325 

326 def is_serving(self): 

327 return self._serving 

328 

329 @property 

330 def sockets(self): 

331 if self._sockets is None: 

332 return () 

333 return tuple(trsock.TransportSocket(s) for s in self._sockets) 

334 

335 def close(self): 

336 sockets = self._sockets 

337 if sockets is None: 

338 return 

339 self._sockets = None 

340 

341 for sock in sockets: 

342 self._loop._stop_serving(sock) 

343 

344 self._serving = False 

345 

346 if (self._serving_forever_fut is not None and 346 ↛ 348line 346 didn't jump to line 348 because the condition on line 346 was never true

347 not self._serving_forever_fut.done()): 

348 self._serving_forever_fut.cancel() 

349 self._serving_forever_fut = None 

350 

351 if len(self._clients) == 0: 

352 self._wakeup() 

353 

354 def close_clients(self): 

355 for transport in self._clients.copy(): 

356 transport.close() 

357 

358 def abort_clients(self): 

359 for transport in self._clients.copy(): 

360 transport.abort() 

361 

362 async def start_serving(self): 

363 self._start_serving() 

364 # Skip one loop iteration so that all 'loop.add_reader' 

365 # go through. 

366 await tasks.sleep(0) 

367 

368 async def serve_forever(self): 

369 if self._serving_forever_fut is not None: 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true

370 raise RuntimeError( 

371 f'server {self!r} is already being awaited on serve_forever()') 

372 if self._sockets is None: 

373 raise RuntimeError(f'server {self!r} is closed') 

374 

375 self._start_serving() 

376 self._serving_forever_fut = self._loop.create_future() 

377 

378 try: 

379 await self._serving_forever_fut 

380 except exceptions.CancelledError: 

381 try: 

382 self.close() 

383 await self.wait_closed() 

384 finally: 

385 raise 

386 finally: 

387 self._serving_forever_fut = None 

388 

389 async def wait_closed(self): 

390 """Wait until server is closed and all connections are dropped. 

391 

392 - If the server is not closed, wait. 

393 - If it is closed, but there are still active connections, wait. 

394 

395 Anyone waiting here will be unblocked once both conditions 

396 (server is closed and all connections have been dropped) 

397 have become true, in either order. 

398 

399 Historical note: In 3.11 and before, this was broken, returning 

400 immediately if the server was already closed, even if there 

401 were still active connections. An attempted fix in 3.12.0 was 

402 still broken, returning immediately if the server was still 

403 open and there were no active connections. Hopefully in 3.12.1 

404 we have it right. 

405 """ 

406 # Waiters are unblocked by self._wakeup(), which is called 

407 # from two places: self.close() and self._detach(), but only 

408 # when both conditions have become true. To signal that this 

409 # has happened, self._wakeup() sets self._waiters to None. 

410 if self._waiters is None: 

411 return 

412 waiter = self._loop.create_future() 

413 self._waiters.append(waiter) 

414 await waiter 

415 

416 

417class BaseEventLoop(events.AbstractEventLoop): 

418 

419 def __init__(self): 

420 self._timer_cancelled_count = 0 

421 self._closed = False 

422 self._stopping = False 

423 self._ready = collections.deque() 

424 self._scheduled = [] 

425 self._default_executor = None 

426 self._internal_fds = 0 

427 # Identifier of the thread running the event loop, or None if the 

428 # event loop is not running 

429 self._thread_id = None 

430 self._clock_resolution = time.get_clock_info('monotonic').resolution 

431 self._exception_handler = None 

432 self.set_debug(coroutines._is_debug_mode()) 

433 # The preserved state of async generator hooks. 

434 self._old_agen_hooks = None 

435 # In debug mode, if the execution of a callback or a step of a task 

436 # exceed this duration in seconds, the slow callback/task is logged. 

437 self.slow_callback_duration = 0.1 

438 self._current_handle = None 

439 self._task_factory = None 

440 self._coroutine_origin_tracking_enabled = False 

441 self._coroutine_origin_tracking_saved_depth = None 

442 

443 # A weak set of all asynchronous generators that are 

444 # being iterated by the loop. 

445 self._asyncgens = weakref.WeakSet() 

446 # Set to True when `loop.shutdown_asyncgens` is called. 

447 self._asyncgens_shutdown_called = False 

448 # Set to True when `loop.shutdown_default_executor` is called. 

449 self._executor_shutdown_called = False 

450 

451 def __repr__(self): 

452 return ( 

453 f'<{self.__class__.__name__} running={self.is_running()} ' 

454 f'closed={self.is_closed()} debug={self.get_debug()}>' 

455 ) 

456 

457 def create_future(self): 

458 """Create a Future object attached to the loop.""" 

459 return futures.Future(loop=self) 

460 

461 def create_task(self, coro, **kwargs): 

462 """Schedule or begin executing a coroutine object. 

463 

464 Return a task object. 

465 """ 

466 self._check_closed() 

467 if self._task_factory is not None: 

468 return self._task_factory(self, coro, **kwargs) 

469 

470 task = tasks.Task(coro, loop=self, **kwargs) 

471 if task._source_traceback: 

472 del task._source_traceback[-1] 

473 try: 

474 return task 

475 finally: 

476 # gh-128552: prevent a refcycle of 

477 # task.exception().__traceback__->BaseEventLoop.create_task->task 

478 del task 

479 

480 def set_task_factory(self, factory): 

481 """Set a task factory that will be used by loop.create_task(). 

482 

483 If factory is None the default task factory will be set. 

484 

485 If factory is a callable, it should have a signature matching 

486 '(loop, coro, **kwargs)', where 'loop' will be a reference to the active 

487 event loop, 'coro' will be a coroutine object, and **kwargs will be 

488 arbitrary keyword arguments that should be passed on to Task. 

489 The callable must return a Task. 

490 """ 

491 if factory is not None and not callable(factory): 

492 raise TypeError('task factory must be a callable or None') 

493 self._task_factory = factory 

494 

495 def get_task_factory(self): 

496 """Return a task factory, or None if the default one is in use.""" 

497 return self._task_factory 

498 

499 def _make_socket_transport(self, sock, protocol, waiter=None, *, 

500 extra=None, server=None): 

501 """Create socket transport.""" 

502 raise NotImplementedError 

503 

504 def _make_ssl_transport( 

505 self, rawsock, protocol, sslcontext, waiter=None, 

506 *, server_side=False, server_hostname=None, 

507 extra=None, server=None, 

508 ssl_handshake_timeout=None, 

509 ssl_shutdown_timeout=None, 

510 call_connection_made=True): 

511 """Create SSL transport.""" 

512 raise NotImplementedError 

513 

514 def _make_datagram_transport(self, sock, protocol, 

515 address=None, waiter=None, extra=None): 

516 """Create datagram transport.""" 

517 raise NotImplementedError 

518 

519 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 

520 extra=None): 

521 """Create read pipe transport.""" 

522 raise NotImplementedError 

523 

524 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 

525 extra=None): 

526 """Create write pipe transport.""" 

527 raise NotImplementedError 

528 

529 async def _make_subprocess_transport(self, protocol, args, shell, 

530 stdin, stdout, stderr, bufsize, 

531 extra=None, **kwargs): 

532 """Create subprocess transport.""" 

533 raise NotImplementedError 

534 

535 def _write_to_self(self): 

536 """Write a byte to self-pipe, to wake up the event loop. 

537 

538 This may be called from a different thread. 

539 

540 The subclass is responsible for implementing the self-pipe. 

541 """ 

542 raise NotImplementedError 

543 

544 def _process_events(self, event_list): 

545 """Process selector events.""" 

546 raise NotImplementedError 

547 

548 def _check_closed(self): 

549 if self._closed: 

550 raise RuntimeError('Event loop is closed') 

551 

552 def _check_default_executor(self): 

553 if self._executor_shutdown_called: 553 ↛ 554line 553 didn't jump to line 554 because the condition on line 553 was never true

554 raise RuntimeError('Executor shutdown has been called') 

555 

556 def _asyncgen_finalizer_hook(self, agen): 

557 self._asyncgens.discard(agen) 

558 if not self.is_closed(): 558 ↛ exitline 558 didn't return from function '_asyncgen_finalizer_hook' because the condition on line 558 was always true

559 self.call_soon_threadsafe(self.create_task, agen.aclose()) 

560 

561 def _asyncgen_firstiter_hook(self, agen): 

562 if self._asyncgens_shutdown_called: 562 ↛ 563line 562 didn't jump to line 563 because the condition on line 562 was never true

563 warnings.warn( 

564 f"asynchronous generator {agen!r} was scheduled after " 

565 f"loop.shutdown_asyncgens() call", 

566 ResourceWarning, source=self) 

567 

568 self._asyncgens.add(agen) 

569 

570 async def shutdown_asyncgens(self): 

571 """Shutdown all active asynchronous generators.""" 

572 self._asyncgens_shutdown_called = True 

573 

574 if not len(self._asyncgens): 

575 # If Python version is <3.6 or we don't have any asynchronous 

576 # generators alive. 

577 return 

578 

579 closing_agens = list(self._asyncgens) 

580 self._asyncgens.clear() 

581 

582 results = await tasks.gather( 

583 *[ag.aclose() for ag in closing_agens], 

584 return_exceptions=True) 

585 

586 for result, agen in zip(results, closing_agens): 

587 if isinstance(result, Exception): 587 ↛ 588line 587 didn't jump to line 588 because the condition on line 587 was never true

588 self.call_exception_handler({ 

589 'message': f'an error occurred during closing of ' 

590 f'asynchronous generator {agen!r}', 

591 'exception': result, 

592 'asyncgen': agen 

593 }) 

594 

595 async def shutdown_default_executor(self, timeout=None): 

596 """Schedule the shutdown of the default executor. 

597 

598 The timeout parameter specifies the amount of time the executor will 

599 be given to finish joining. The default value is None, which means 

600 that the executor will be given an unlimited amount of time. 

601 """ 

602 self._executor_shutdown_called = True 

603 if self._default_executor is None: 

604 return 

605 future = self.create_future() 

606 thread = threading.Thread(target=self._do_shutdown, args=(future,)) 

607 thread.start() 

608 try: 

609 async with timeouts.timeout(timeout): 

610 await future 

611 except TimeoutError: 

612 warnings.warn("The executor did not finishing joining " 

613 f"its threads within {timeout} seconds.", 

614 RuntimeWarning, stacklevel=2) 

615 self._default_executor.shutdown(wait=False) 

616 else: 

617 thread.join() 

618 

619 def _do_shutdown(self, future): 

620 try: 

621 self._default_executor.shutdown(wait=True) 

622 if not self.is_closed(): 622 ↛ exitline 622 didn't return from function '_do_shutdown' because the condition on line 622 was always true

623 self.call_soon_threadsafe(futures._set_result_unless_cancelled, 

624 future, None) 

625 except Exception as ex: 

626 if not self.is_closed() and not future.cancelled(): 

627 self.call_soon_threadsafe(future.set_exception, ex) 

628 

629 def _check_running(self): 

630 if self.is_running(): 

631 raise RuntimeError('This event loop is already running') 

632 if events._get_running_loop() is not None: 

633 raise RuntimeError( 

634 'Cannot run the event loop while another loop is running') 

635 

636 def _run_forever_setup(self): 

637 """Prepare the run loop to process events. 

638 

639 This method exists so that custom event loop subclasses (e.g., event loops 

640 that integrate a GUI event loop with Python's event loop) have access to all the 

641 loop setup logic. 

642 """ 

643 self._check_closed() 

644 self._check_running() 

645 self._set_coroutine_origin_tracking(self._debug) 

646 

647 self._old_agen_hooks = sys.get_asyncgen_hooks() 

648 self._thread_id = threading.get_ident() 

649 sys.set_asyncgen_hooks( 

650 firstiter=self._asyncgen_firstiter_hook, 

651 finalizer=self._asyncgen_finalizer_hook 

652 ) 

653 

654 events._set_running_loop(self) 

655 

656 def _run_forever_cleanup(self): 

657 """Clean up after an event loop finishes the looping over events. 

658 

659 This method exists so that custom event loop subclasses (e.g., event loops 

660 that integrate a GUI event loop with Python's event loop) have access to all the 

661 loop cleanup logic. 

662 """ 

663 self._stopping = False 

664 self._thread_id = None 

665 events._set_running_loop(None) 

666 self._set_coroutine_origin_tracking(False) 

667 # Restore any pre-existing async generator hooks. 

668 if self._old_agen_hooks is not None: 668 ↛ exitline 668 didn't return from function '_run_forever_cleanup' because the condition on line 668 was always true

669 sys.set_asyncgen_hooks(*self._old_agen_hooks) 

670 self._old_agen_hooks = None 

671 

672 def run_forever(self): 

673 """Run until stop() is called.""" 

674 self._run_forever_setup() 

675 try: 

676 while True: 

677 self._run_once() 

678 if self._stopping: 

679 break 

680 finally: 

681 self._run_forever_cleanup() 

682 

683 def run_until_complete(self, future): 

684 """Run until the Future is done. 

685 

686 If the argument is a coroutine, it is wrapped in a Task. 

687 

688 WARNING: It would be disastrous to call run_until_complete() 

689 with the same coroutine twice -- it would wrap it in two 

690 different Tasks and that can't be good. 

691 

692 Return the Future's result, or raise its exception. 

693 """ 

694 self._check_closed() 

695 self._check_running() 

696 

697 new_task = not futures.isfuture(future) 

698 future = tasks.ensure_future(future, loop=self) 

699 if new_task: 

700 # An exception is raised if the future didn't complete, so there 

701 # is no need to log the "destroy pending task" message 

702 future._log_destroy_pending = False 

703 

704 future.add_done_callback(_run_until_complete_cb) 

705 try: 

706 self.run_forever() 

707 except: 

708 if new_task and future.done() and not future.cancelled(): 

709 # The coroutine raised a BaseException. Consume the exception 

710 # to not log a warning, the caller doesn't have access to the 

711 # local task. 

712 future.exception() 

713 raise 

714 finally: 

715 future.remove_done_callback(_run_until_complete_cb) 

716 if not future.done(): 

717 raise RuntimeError('Event loop stopped before Future completed.') 

718 

719 return future.result() 

720 

721 def stop(self): 

722 """Stop running the event loop. 

723 

724 Every callback already scheduled will still run. This simply informs 

725 run_forever to stop looping after a complete iteration. 

726 """ 

727 self._stopping = True 

728 

729 def close(self): 

730 """Close the event loop. 

731 

732 This clears the queues and shuts down the executor, 

733 but does not wait for the executor to finish. 

734 

735 The event loop must not be running. 

736 """ 

737 if self.is_running(): 737 ↛ 738line 737 didn't jump to line 738 because the condition on line 737 was never true

738 raise RuntimeError("Cannot close a running event loop") 

739 if self._closed: 

740 return 

741 if self._debug: 

742 logger.debug("Close %r", self) 

743 self._closed = True 

744 self._ready.clear() 

745 self._scheduled.clear() 

746 self._executor_shutdown_called = True 

747 executor = self._default_executor 

748 if executor is not None: 

749 self._default_executor = None 

750 executor.shutdown(wait=False) 

751 

752 def is_closed(self): 

753 """Returns True if the event loop was closed.""" 

754 return self._closed 

755 

756 def __del__(self, _warn=warnings.warn): 

757 if not self.is_closed(): 757 ↛ 758line 757 didn't jump to line 758 because the condition on line 757 was never true

758 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self) 

759 if not self.is_running(): 

760 self.close() 

761 

762 def is_running(self): 

763 """Returns True if the event loop is running.""" 

764 return (self._thread_id is not None) 

765 

766 def time(self): 

767 """Return the time according to the event loop's clock. 

768 

769 This is a float expressed in seconds since an epoch, but the 

770 epoch, precision, accuracy and drift are unspecified and may 

771 differ per event loop. 

772 """ 

773 return time.monotonic() 

774 

775 def call_later(self, delay, callback, *args, context=None): 

776 """Arrange for a callback to be called at a given time. 

777 

778 Return a Handle: an opaque object with a cancel() method that 

779 can be used to cancel the call. 

780 

781 The delay can be an int or float, expressed in seconds. It is 

782 always relative to the current time. 

783 

784 Each callback will be called exactly once. If two callbacks 

785 are scheduled for exactly the same time, it is undefined which 

786 will be called first. 

787 

788 Any positional arguments after the callback will be passed to 

789 the callback when it is called. 

790 """ 

791 if delay is None: 

792 raise TypeError('delay must not be None') 

793 timer = self.call_at(self.time() + delay, callback, *args, 

794 context=context) 

795 if timer._source_traceback: 

796 del timer._source_traceback[-1] 

797 return timer 

798 

799 def call_at(self, when, callback, *args, context=None): 

800 """Like call_later(), but uses an absolute time. 

801 

802 Absolute time corresponds to the event loop's time() method. 

803 """ 

804 if when is None: 

805 raise TypeError("when cannot be None") 

806 self._check_closed() 

807 if self._debug: 

808 self._check_thread() 

809 self._check_callback(callback, 'call_at') 

810 timer = events.TimerHandle(when, callback, args, self, context) 

811 if timer._source_traceback: 

812 del timer._source_traceback[-1] 

813 heapq.heappush(self._scheduled, timer) 

814 timer._scheduled = True 

815 return timer 

816 

817 def call_soon(self, callback, *args, context=None): 

818 """Arrange for a callback to be called as soon as possible. 

819 

820 This operates as a FIFO queue: callbacks are called in the 

821 order in which they are registered. Each callback will be 

822 called exactly once. 

823 

824 Any positional arguments after the callback will be passed to 

825 the callback when it is called. 

826 """ 

827 self._check_closed() 

828 if self._debug: 

829 self._check_thread() 

830 self._check_callback(callback, 'call_soon') 

831 handle = self._call_soon(callback, args, context) 

832 if handle._source_traceback: 

833 del handle._source_traceback[-1] 

834 return handle 

835 

836 def _check_callback(self, callback, method): 

837 if (coroutines.iscoroutine(callback) or 

838 coroutines._iscoroutinefunction(callback)): 

839 raise TypeError( 

840 f"coroutines cannot be used with {method}()") 

841 if not callable(callback): 

842 raise TypeError( 

843 f'a callable object was expected by {method}(), ' 

844 f'got {callback!r}') 

845 

846 def _call_soon(self, callback, args, context): 

847 handle = events.Handle(callback, args, self, context) 

848 if handle._source_traceback: 

849 del handle._source_traceback[-1] 

850 self._ready.append(handle) 

851 return handle 

852 

853 def _check_thread(self): 

854 """Check that the current thread is the thread running the event loop. 

855 

856 Non-thread-safe methods of this class make this assumption and will 

857 likely behave incorrectly when the assumption is violated. 

858 

859 Should only be called when (self._debug == True). The caller is 

860 responsible for checking this condition for performance reasons. 

861 """ 

862 if self._thread_id is None: 

863 return 

864 thread_id = threading.get_ident() 

865 if thread_id != self._thread_id: 

866 raise RuntimeError( 

867 "Non-thread-safe operation invoked on an event loop other " 

868 "than the current one") 

869 

870 def call_soon_threadsafe(self, callback, *args, context=None): 

871 """Like call_soon(), but thread-safe.""" 

872 self._check_closed() 

873 if self._debug: 

874 self._check_callback(callback, 'call_soon_threadsafe') 

875 handle = events._ThreadSafeHandle(callback, args, self, context) 

876 self._ready.append(handle) 

877 if handle._source_traceback: 

878 del handle._source_traceback[-1] 

879 if handle._source_traceback: 

880 del handle._source_traceback[-1] 

881 self._write_to_self() 

882 return handle 

883 

884 def run_in_executor(self, executor, func, *args): 

885 self._check_closed() 

886 if self._debug: 

887 self._check_callback(func, 'run_in_executor') 

888 if executor is None: 

889 executor = self._default_executor 

890 # Only check when the default executor is being used 

891 self._check_default_executor() 

892 if executor is None: 

893 executor = concurrent.futures.ThreadPoolExecutor( 

894 thread_name_prefix='asyncio' 

895 ) 

896 self._default_executor = executor 

897 return futures.wrap_future( 

898 executor.submit(func, *args), loop=self) 

899 

900 def set_default_executor(self, executor): 

901 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): 

902 raise TypeError('executor must be ThreadPoolExecutor instance') 

903 self._default_executor = executor 

904 

905 def _getaddrinfo_debug(self, host, port, family, type, proto, flags): 

906 msg = [f"{host}:{port!r}"] 

907 if family: 

908 msg.append(f'family={family!r}') 

909 if type: 

910 msg.append(f'type={type!r}') 

911 if proto: 

912 msg.append(f'proto={proto!r}') 

913 if flags: 

914 msg.append(f'flags={flags!r}') 

915 msg = ', '.join(msg) 

916 logger.debug('Get address info %s', msg) 

917 

918 t0 = self.time() 

919 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags) 

920 dt = self.time() - t0 

921 

922 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}' 

923 if dt >= self.slow_callback_duration: 

924 logger.info(msg) 

925 else: 

926 logger.debug(msg) 

927 return addrinfo 

928 

929 async def getaddrinfo(self, host, port, *, 

930 family=0, type=0, proto=0, flags=0): 

931 if self._debug: 931 ↛ 932line 931 didn't jump to line 932 because the condition on line 931 was never true

932 getaddr_func = self._getaddrinfo_debug 

933 else: 

934 getaddr_func = socket.getaddrinfo 

935 

936 return await self.run_in_executor( 

937 None, getaddr_func, host, port, family, type, proto, flags) 

938 

939 async def getnameinfo(self, sockaddr, flags=0): 

940 return await self.run_in_executor( 

941 None, socket.getnameinfo, sockaddr, flags) 

942 

943 async def sock_sendfile(self, sock, file, offset=0, count=None, 

944 *, fallback=True): 

945 if self._debug and sock.gettimeout() != 0: 

946 raise ValueError("the socket must be non-blocking") 

947 _check_ssl_socket(sock) 

948 self._check_sendfile_params(sock, file, offset, count) 

949 try: 

950 return await self._sock_sendfile_native(sock, file, 

951 offset, count) 

952 except exceptions.SendfileNotAvailableError as exc: 

953 if not fallback: 

954 raise 

955 return await self._sock_sendfile_fallback(sock, file, 

956 offset, count) 

957 

958 async def _sock_sendfile_native(self, sock, file, offset, count): 

959 # NB: sendfile syscall is not supported for SSL sockets and 

960 # non-mmap files even if sendfile is supported by OS 

961 raise exceptions.SendfileNotAvailableError( 

962 f"syscall sendfile is not available for socket {sock!r} " 

963 f"and file {file!r} combination") 

964 

965 async def _sock_sendfile_fallback(self, sock, file, offset, count): 

966 if offset: 

967 file.seek(offset) 

968 blocksize = ( 

969 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE) 

970 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE 

971 ) 

972 buf = bytearray(blocksize) 

973 total_sent = 0 

974 try: 

975 while True: 

976 if count: 

977 blocksize = min(count - total_sent, blocksize) 

978 if blocksize <= 0: 

979 break 

980 view = memoryview(buf)[:blocksize] 

981 read = await self.run_in_executor(None, file.readinto, view) 

982 if not read: 

983 break # EOF 

984 await self.sock_sendall(sock, view[:read]) 

985 total_sent += read 

986 return total_sent 

987 finally: 

988 if total_sent > 0 and hasattr(file, 'seek'): 

989 file.seek(offset + total_sent) 

990 

991 def _check_sendfile_params(self, sock, file, offset, count): 

992 if 'b' not in getattr(file, 'mode', 'b'): 

993 raise ValueError("file should be opened in binary mode") 

994 if not sock.type == socket.SOCK_STREAM: 

995 raise ValueError("only SOCK_STREAM type sockets are supported") 

996 if count is not None: 

997 if not isinstance(count, int): 

998 raise TypeError( 

999 "count must be a positive integer (got {!r})".format(count)) 

1000 if count <= 0: 

1001 raise ValueError( 

1002 "count must be a positive integer (got {!r})".format(count)) 

1003 if not isinstance(offset, int): 

1004 raise TypeError( 

1005 "offset must be a non-negative integer (got {!r})".format( 

1006 offset)) 

1007 if offset < 0: 

1008 raise ValueError( 

1009 "offset must be a non-negative integer (got {!r})".format( 

1010 offset)) 

1011 

1012 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None): 

1013 """Create, bind and connect one socket.""" 

1014 my_exceptions = [] 

1015 exceptions.append(my_exceptions) 

1016 family, type_, proto, _, address = addr_info 

1017 sock = None 

1018 try: 

1019 try: 

1020 sock = socket.socket(family=family, type=type_, proto=proto) 

1021 sock.setblocking(False) 

1022 if local_addr_infos is not None: 

1023 for lfamily, _, _, _, laddr in local_addr_infos: 

1024 # skip local addresses of different family 

1025 if lfamily != family: 

1026 continue 

1027 try: 

1028 sock.bind(laddr) 

1029 break 

1030 except OSError as exc: 

1031 msg = ( 

1032 f'error while attempting to bind on ' 

1033 f'address {laddr!r}: {str(exc).lower()}' 

1034 ) 

1035 exc = OSError(exc.errno, msg) 

1036 my_exceptions.append(exc) 

1037 else: # all bind attempts failed 

1038 if my_exceptions: 

1039 raise my_exceptions.pop() 

1040 else: 

1041 raise OSError(f"no matching local address with {family=} found") 

1042 await self.sock_connect(sock, address) 

1043 return sock 

1044 except OSError as exc: 

1045 my_exceptions.append(exc) 

1046 raise 

1047 except: 

1048 if sock is not None: 

1049 try: 

1050 sock.close() 

1051 except OSError: 

1052 # An error when closing a newly created socket is 

1053 # not important, but it can overwrite more important 

1054 # non-OSError error. So ignore it. 

1055 pass 

1056 raise 

1057 finally: 

1058 exceptions = my_exceptions = None 

1059 

1060 async def create_connection( 

1061 self, protocol_factory, host=None, port=None, 

1062 *, ssl=None, family=0, 

1063 proto=0, flags=0, sock=None, 

1064 local_addr=None, server_hostname=None, 

1065 ssl_handshake_timeout=None, 

1066 ssl_shutdown_timeout=None, 

1067 happy_eyeballs_delay=None, interleave=None, 

1068 all_errors=False): 

1069 """Connect to a TCP server. 

1070 

1071 Create a streaming transport connection to a given internet host and 

1072 port: socket family AF_INET or socket.AF_INET6 depending on host (or 

1073 family if specified), socket type SOCK_STREAM. protocol_factory must be 

1074 a callable returning a protocol instance. 

1075 

1076 This method is a coroutine which will try to establish the connection 

1077 in the background. When successful, the coroutine returns a 

1078 (transport, protocol) pair. 

1079 """ 

1080 if server_hostname is not None and not ssl: 

1081 raise ValueError('server_hostname is only meaningful with ssl') 

1082 

1083 if server_hostname is None and ssl: 

1084 # Use host as default for server_hostname. It is an error 

1085 # if host is empty or not set, e.g. when an 

1086 # already-connected socket was passed or when only a port 

1087 # is given. To avoid this error, you can pass 

1088 # server_hostname='' -- this will bypass the hostname 

1089 # check. (This also means that if host is a numeric 

1090 # IP/IPv6 address, we will attempt to verify that exact 

1091 # address; this will probably fail, but it is possible to 

1092 # create a certificate for a specific IP address, so we 

1093 # don't judge it here.) 

1094 if not host: 

1095 raise ValueError('You must set server_hostname ' 

1096 'when using ssl without a host') 

1097 server_hostname = host 

1098 

1099 if ssl_handshake_timeout is not None and not ssl: 

1100 raise ValueError( 

1101 'ssl_handshake_timeout is only meaningful with ssl') 

1102 

1103 if ssl_shutdown_timeout is not None and not ssl: 1103 ↛ 1104line 1103 didn't jump to line 1104 because the condition on line 1103 was never true

1104 raise ValueError( 

1105 'ssl_shutdown_timeout is only meaningful with ssl') 

1106 

1107 if sock is not None: 

1108 _check_ssl_socket(sock) 

1109 

1110 if happy_eyeballs_delay is not None and interleave is None: 

1111 # If using happy eyeballs, default to interleave addresses by family 

1112 interleave = 1 

1113 

1114 if host is not None or port is not None: 

1115 if sock is not None: 

1116 raise ValueError( 

1117 'host/port and sock can not be specified at the same time') 

1118 

1119 infos = await self._ensure_resolved( 

1120 (host, port), family=family, 

1121 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) 

1122 if not infos: 

1123 raise OSError('getaddrinfo() returned empty list') 

1124 

1125 if local_addr is not None: 

1126 laddr_infos = await self._ensure_resolved( 

1127 local_addr, family=family, 

1128 type=socket.SOCK_STREAM, proto=proto, 

1129 flags=flags, loop=self) 

1130 if not laddr_infos: 

1131 raise OSError('getaddrinfo() returned empty list') 

1132 else: 

1133 laddr_infos = None 

1134 

1135 if interleave: 

1136 infos = _interleave_addrinfos(infos, interleave) 

1137 

1138 exceptions = [] 

1139 if happy_eyeballs_delay is None: 

1140 # not using happy eyeballs 

1141 for addrinfo in infos: 

1142 try: 

1143 sock = await self._connect_sock( 

1144 exceptions, addrinfo, laddr_infos) 

1145 break 

1146 except OSError: 

1147 continue 

1148 else: # using happy eyeballs 

1149 sock = (await staggered.staggered_race( 

1150 ( 

1151 # can't use functools.partial as it keeps a reference 

1152 # to exceptions 

1153 lambda addrinfo=addrinfo: self._connect_sock( 

1154 exceptions, addrinfo, laddr_infos 

1155 ) 

1156 for addrinfo in infos 

1157 ), 

1158 happy_eyeballs_delay, 

1159 loop=self, 

1160 ))[0] # can't use sock, _, _ as it keeks a reference to exceptions 

1161 

1162 if sock is None: 

1163 exceptions = [exc for sub in exceptions for exc in sub] 

1164 try: 

1165 if all_errors: 

1166 raise ExceptionGroup("create_connection failed", exceptions) 

1167 if len(exceptions) == 1: 

1168 raise exceptions[0] 

1169 elif exceptions: 

1170 # If they all have the same str(), raise one. 

1171 model = str(exceptions[0]) 

1172 if all(str(exc) == model for exc in exceptions): 

1173 raise exceptions[0] 

1174 # Raise a combined exception so the user can see all 

1175 # the various error messages. 

1176 raise OSError('Multiple exceptions: {}'.format( 

1177 ', '.join(str(exc) for exc in exceptions))) 

1178 else: 

1179 # No exceptions were collected, raise a timeout error 

1180 raise TimeoutError('create_connection failed') 

1181 finally: 

1182 exceptions = None 

1183 

1184 else: 

1185 if sock is None: 

1186 raise ValueError( 

1187 'host and port was not specified and no sock specified') 

1188 if sock.type != socket.SOCK_STREAM: 

1189 # We allow AF_INET, AF_INET6, AF_UNIX as long as they 

1190 # are SOCK_STREAM. 

1191 # We support passing AF_UNIX sockets even though we have 

1192 # a dedicated API for that: create_unix_connection. 

1193 # Disallowing AF_UNIX in this method, breaks backwards 

1194 # compatibility. 

1195 raise ValueError( 

1196 f'A Stream Socket was expected, got {sock!r}') 

1197 

1198 transport, protocol = await self._create_connection_transport( 

1199 sock, protocol_factory, ssl, server_hostname, 

1200 ssl_handshake_timeout=ssl_handshake_timeout, 

1201 ssl_shutdown_timeout=ssl_shutdown_timeout) 

1202 if self._debug: 

1203 # Get the socket from the transport because SSL transport closes 

1204 # the old socket and creates a new SSL socket 

1205 sock = transport.get_extra_info('socket') 

1206 logger.debug("%r connected to %s:%r: (%r, %r)", 

1207 sock, host, port, transport, protocol) 

1208 return transport, protocol 

1209 

1210 async def _create_connection_transport( 

1211 self, sock, protocol_factory, ssl, 

1212 server_hostname, server_side=False, 

1213 ssl_handshake_timeout=None, 

1214 ssl_shutdown_timeout=None): 

1215 

1216 sock.setblocking(False) 

1217 

1218 protocol = protocol_factory() 

1219 waiter = self.create_future() 

1220 if ssl: 

1221 sslcontext = None if isinstance(ssl, bool) else ssl 

1222 transport = self._make_ssl_transport( 

1223 sock, protocol, sslcontext, waiter, 

1224 server_side=server_side, server_hostname=server_hostname, 

1225 ssl_handshake_timeout=ssl_handshake_timeout, 

1226 ssl_shutdown_timeout=ssl_shutdown_timeout) 

1227 else: 

1228 transport = self._make_socket_transport(sock, protocol, waiter) 

1229 

1230 try: 

1231 await waiter 

1232 except: 

1233 transport.close() 

1234 raise 

1235 

1236 return transport, protocol 

1237 

1238 async def sendfile(self, transport, file, offset=0, count=None, 

1239 *, fallback=True): 

1240 """Send a file to transport. 

1241 

1242 Return the total number of bytes which were sent. 

1243 

1244 The method uses high-performance os.sendfile if available. 

1245 

1246 file must be a regular file object opened in binary mode. 

1247 

1248 offset tells from where to start reading the file. If specified, 

1249 count is the total number of bytes to transmit as opposed to 

1250 sending the file until EOF is reached. File position is updated on 

1251 return or also in case of error in which case file.tell() 

1252 can be used to figure out the number of bytes 

1253 which were sent. 

1254 

1255 fallback set to True makes asyncio to manually read and send 

1256 the file when the platform does not support the sendfile syscall 

1257 (e.g. Windows or SSL socket on Unix). 

1258 

1259 Raise SendfileNotAvailableError if the system does not support 

1260 sendfile syscall and fallback is False. 

1261 """ 

1262 if transport.is_closing(): 

1263 raise RuntimeError("Transport is closing") 

1264 mode = getattr(transport, '_sendfile_compatible', 

1265 constants._SendfileMode.UNSUPPORTED) 

1266 if mode is constants._SendfileMode.UNSUPPORTED: 

1267 raise RuntimeError( 

1268 f"sendfile is not supported for transport {transport!r}") 

1269 if mode is constants._SendfileMode.TRY_NATIVE: 

1270 try: 

1271 return await self._sendfile_native(transport, file, 

1272 offset, count) 

1273 except exceptions.SendfileNotAvailableError as exc: 

1274 if not fallback: 

1275 raise 

1276 

1277 if not fallback: 

1278 raise RuntimeError( 

1279 f"fallback is disabled and native sendfile is not " 

1280 f"supported for transport {transport!r}") 

1281 

1282 return await self._sendfile_fallback(transport, file, 

1283 offset, count) 

1284 

1285 async def _sendfile_native(self, transp, file, offset, count): 

1286 raise exceptions.SendfileNotAvailableError( 

1287 "sendfile syscall is not supported") 

1288 

1289 async def _sendfile_fallback(self, transp, file, offset, count): 

1290 if offset: 

1291 file.seek(offset) 

1292 blocksize = min(count, 16384) if count else 16384 

1293 buf = bytearray(blocksize) 

1294 total_sent = 0 

1295 proto = _SendfileFallbackProtocol(transp) 

1296 try: 

1297 while True: 

1298 if count: 

1299 blocksize = min(count - total_sent, blocksize) 

1300 if blocksize <= 0: 

1301 return total_sent 

1302 view = memoryview(buf)[:blocksize] 

1303 read = await self.run_in_executor(None, file.readinto, view) 

1304 if not read: 

1305 return total_sent # EOF 

1306 transp.write(view[:read]) 

1307 await proto.drain() 

1308 total_sent += read 

1309 finally: 

1310 if total_sent > 0 and hasattr(file, 'seek'): 

1311 file.seek(offset + total_sent) 

1312 await proto.restore() 

1313 

1314 async def start_tls(self, transport, protocol, sslcontext, *, 

1315 server_side=False, 

1316 server_hostname=None, 

1317 ssl_handshake_timeout=None, 

1318 ssl_shutdown_timeout=None): 

1319 """Upgrade transport to TLS. 

1320 

1321 Return a new transport that *protocol* should start using 

1322 immediately. 

1323 """ 

1324 if ssl is None: 1324 ↛ 1325line 1324 didn't jump to line 1325 because the condition on line 1324 was never true

1325 raise RuntimeError('Python ssl module is not available') 

1326 

1327 if not isinstance(sslcontext, ssl.SSLContext): 

1328 raise TypeError( 

1329 f'sslcontext is expected to be an instance of ssl.SSLContext, ' 

1330 f'got {sslcontext!r}') 

1331 

1332 if not getattr(transport, '_start_tls_compatible', False): 

1333 raise TypeError( 

1334 f'transport {transport!r} is not supported by start_tls()') 

1335 

1336 waiter = self.create_future() 

1337 ssl_protocol = sslproto.SSLProtocol( 

1338 self, protocol, sslcontext, waiter, 

1339 server_side, server_hostname, 

1340 ssl_handshake_timeout=ssl_handshake_timeout, 

1341 ssl_shutdown_timeout=ssl_shutdown_timeout, 

1342 call_connection_made=False) 

1343 

1344 # Pause early so that "ssl_protocol.data_received()" doesn't 

1345 # have a chance to get called before "ssl_protocol.connection_made()". 

1346 transport.pause_reading() 

1347 

1348 transport.set_protocol(ssl_protocol) 

1349 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport) 

1350 resume_cb = self.call_soon(transport.resume_reading) 

1351 

1352 try: 

1353 await waiter 

1354 except BaseException: 

1355 transport.close() 

1356 conmade_cb.cancel() 

1357 resume_cb.cancel() 

1358 raise 

1359 

1360 return ssl_protocol._app_transport 

1361 

1362 async def create_datagram_endpoint(self, protocol_factory, 

1363 local_addr=None, remote_addr=None, *, 

1364 family=0, proto=0, flags=0, 

1365 reuse_port=None, 

1366 allow_broadcast=None, sock=None): 

1367 """Create datagram connection.""" 

1368 if sock is not None: 

1369 if sock.type == socket.SOCK_STREAM: 

1370 raise ValueError( 

1371 f'A datagram socket was expected, got {sock!r}') 

1372 if (local_addr or remote_addr or 

1373 family or proto or flags or 

1374 reuse_port or allow_broadcast): 

1375 # show the problematic kwargs in exception msg 

1376 opts = dict(local_addr=local_addr, remote_addr=remote_addr, 

1377 family=family, proto=proto, flags=flags, 

1378 reuse_port=reuse_port, 

1379 allow_broadcast=allow_broadcast) 

1380 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v) 

1381 raise ValueError( 

1382 f'socket modifier keyword arguments can not be used ' 

1383 f'when sock is specified. ({problems})') 

1384 sock.setblocking(False) 

1385 r_addr = None 

1386 else: 

1387 if not (local_addr or remote_addr): 

1388 if family == 0: 

1389 raise ValueError('unexpected address family') 

1390 addr_pairs_info = (((family, proto), (None, None)),) 

1391 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: 

1392 for addr in (local_addr, remote_addr): 

1393 if addr is not None and not isinstance(addr, str): 1393 ↛ 1394line 1393 didn't jump to line 1394 because the condition on line 1393 was never true

1394 raise TypeError('string is expected') 

1395 

1396 if local_addr and local_addr[0] not in (0, '\x00'): 1396 ↛ 1408line 1396 didn't jump to line 1408 because the condition on line 1396 was always true

1397 try: 

1398 if stat.S_ISSOCK(os.stat(local_addr).st_mode): 1398 ↛ 1408line 1398 didn't jump to line 1408 because the condition on line 1398 was always true

1399 os.remove(local_addr) 

1400 except FileNotFoundError: 

1401 pass 

1402 except OSError as err: 

1403 # Directory may have permissions only to create socket. 

1404 logger.error('Unable to check or remove stale UNIX ' 

1405 'socket %r: %r', 

1406 local_addr, err) 

1407 

1408 addr_pairs_info = (((family, proto), 

1409 (local_addr, remote_addr)), ) 

1410 else: 

1411 # join address by (family, protocol) 

1412 addr_infos = {} # Using order preserving dict 

1413 for idx, addr in ((0, local_addr), (1, remote_addr)): 

1414 if addr is not None: 

1415 if not (isinstance(addr, tuple) and len(addr) == 2): 

1416 raise TypeError('2-tuple is expected') 

1417 

1418 infos = await self._ensure_resolved( 

1419 addr, family=family, type=socket.SOCK_DGRAM, 

1420 proto=proto, flags=flags, loop=self) 

1421 if not infos: 

1422 raise OSError('getaddrinfo() returned empty list') 

1423 

1424 for fam, _, pro, _, address in infos: 

1425 key = (fam, pro) 

1426 if key not in addr_infos: 1426 ↛ 1428line 1426 didn't jump to line 1428 because the condition on line 1426 was always true

1427 addr_infos[key] = [None, None] 

1428 addr_infos[key][idx] = address 

1429 

1430 # each addr has to have info for each (family, proto) pair 

1431 addr_pairs_info = [ 

1432 (key, addr_pair) for key, addr_pair in addr_infos.items() 

1433 if not ((local_addr and addr_pair[0] is None) or 

1434 (remote_addr and addr_pair[1] is None))] 

1435 

1436 if not addr_pairs_info: 

1437 raise ValueError('can not get address information') 

1438 

1439 exceptions = [] 

1440 

1441 for ((family, proto), 1441 ↛ 1443line 1441 didn't jump to line 1443 because the loop on line 1441 never started

1442 (local_address, remote_address)) in addr_pairs_info: 

1443 sock = None 

1444 r_addr = None 

1445 try: 

1446 sock = socket.socket( 

1447 family=family, type=socket.SOCK_DGRAM, proto=proto) 

1448 if reuse_port: 

1449 _set_reuseport(sock) 

1450 if allow_broadcast: 

1451 sock.setsockopt( 

1452 socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 

1453 sock.setblocking(False) 

1454 

1455 if local_addr: 

1456 sock.bind(local_address) 

1457 if remote_addr: 

1458 if not allow_broadcast: 

1459 await self.sock_connect(sock, remote_address) 

1460 r_addr = remote_address 

1461 except OSError as exc: 

1462 if sock is not None: 

1463 sock.close() 

1464 exceptions.append(exc) 

1465 except: 

1466 if sock is not None: 1466 ↛ 1468line 1466 didn't jump to line 1468 because the condition on line 1466 was always true

1467 sock.close() 

1468 raise 

1469 else: 

1470 break 

1471 else: 

1472 raise exceptions[0] 

1473 

1474 protocol = protocol_factory() 

1475 waiter = self.create_future() 

1476 transport = self._make_datagram_transport( 

1477 sock, protocol, r_addr, waiter) 

1478 if self._debug: 1478 ↛ 1479line 1478 didn't jump to line 1479 because the condition on line 1478 was never true

1479 if local_addr: 

1480 logger.info("Datagram endpoint local_addr=%r remote_addr=%r " 

1481 "created: (%r, %r)", 

1482 local_addr, remote_addr, transport, protocol) 

1483 else: 

1484 logger.debug("Datagram endpoint remote_addr=%r created: " 

1485 "(%r, %r)", 

1486 remote_addr, transport, protocol) 

1487 

1488 try: 

1489 await waiter 

1490 except: 

1491 transport.close() 

1492 raise 

1493 

1494 return transport, protocol 

1495 

1496 async def _ensure_resolved(self, address, *, 

1497 family=0, type=socket.SOCK_STREAM, 

1498 proto=0, flags=0, loop): 

1499 host, port = address[:2] 

1500 info = _ipaddr_info(host, port, family, type, proto, *address[2:]) 

1501 if info is not None: 

1502 # "host" is already a resolved IP. 

1503 return [info] 

1504 else: 

1505 return await loop.getaddrinfo(host, port, family=family, type=type, 

1506 proto=proto, flags=flags) 

1507 

1508 async def _create_server_getaddrinfo(self, host, port, family, flags): 

1509 infos = await self._ensure_resolved((host, port), family=family, 

1510 type=socket.SOCK_STREAM, 

1511 flags=flags, loop=self) 

1512 if not infos: 

1513 raise OSError(f'getaddrinfo({host!r}) returned empty list') 

1514 return infos 

1515 

1516 async def create_server( 

1517 self, protocol_factory, host=None, port=None, 

1518 *, 

1519 family=socket.AF_UNSPEC, 

1520 flags=socket.AI_PASSIVE, 

1521 sock=None, 

1522 backlog=100, 

1523 ssl=None, 

1524 reuse_address=None, 

1525 reuse_port=None, 

1526 keep_alive=None, 

1527 ssl_handshake_timeout=None, 

1528 ssl_shutdown_timeout=None, 

1529 start_serving=True): 

1530 """Create a TCP server. 

1531 

1532 The host parameter can be a string, in that case the TCP server is 

1533 bound to host and port. 

1534 

1535 The host parameter can also be a sequence of strings and in that case 

1536 the TCP server is bound to all hosts of the sequence. If a host 

1537 appears multiple times (possibly indirectly e.g. when hostnames 

1538 resolve to the same IP address), the server is only bound once to that 

1539 host. 

1540 

1541 Return a Server object which can be used to stop the service. 

1542 

1543 This method is a coroutine. 

1544 """ 

1545 if isinstance(ssl, bool): 1545 ↛ 1546line 1545 didn't jump to line 1546 because the condition on line 1545 was never true

1546 raise TypeError('ssl argument must be an SSLContext or None') 

1547 

1548 if ssl_handshake_timeout is not None and ssl is None: 

1549 raise ValueError( 

1550 'ssl_handshake_timeout is only meaningful with ssl') 

1551 

1552 if ssl_shutdown_timeout is not None and ssl is None: 1552 ↛ 1553line 1552 didn't jump to line 1553 because the condition on line 1552 was never true

1553 raise ValueError( 

1554 'ssl_shutdown_timeout is only meaningful with ssl') 

1555 

1556 if sock is not None: 

1557 _check_ssl_socket(sock) 

1558 

1559 if host is not None or port is not None: 

1560 if sock is not None: 

1561 raise ValueError( 

1562 'host/port and sock can not be specified at the same time') 

1563 

1564 if reuse_address is None: 1564 ↛ 1566line 1564 didn't jump to line 1566 because the condition on line 1564 was always true

1565 reuse_address = os.name == "posix" and sys.platform != "cygwin" 

1566 sockets = [] 

1567 if host == '': 

1568 hosts = [None] 

1569 elif (isinstance(host, str) or 

1570 not isinstance(host, collections.abc.Iterable)): 

1571 hosts = [host] 

1572 else: 

1573 hosts = host 

1574 

1575 fs = [self._create_server_getaddrinfo(host, port, family=family, 

1576 flags=flags) 

1577 for host in hosts] 

1578 infos = await tasks.gather(*fs) 

1579 infos = set(itertools.chain.from_iterable(infos)) 

1580 

1581 completed = False 

1582 try: 

1583 for res in infos: 

1584 af, socktype, proto, canonname, sa = res 

1585 try: 

1586 sock = socket.socket(af, socktype, proto) 

1587 except socket.error: 

1588 # Assume it's a bad family/type/protocol combination. 

1589 if self._debug: 

1590 logger.warning('create_server() failed to create ' 

1591 'socket.socket(%r, %r, %r)', 

1592 af, socktype, proto, exc_info=True) 

1593 continue 

1594 sockets.append(sock) 

1595 if reuse_address: 1595 ↛ 1600line 1595 didn't jump to line 1600 because the condition on line 1595 was always true

1596 sock.setsockopt( 

1597 socket.SOL_SOCKET, socket.SO_REUSEADDR, True) 

1598 # Since Linux 6.12.9, SO_REUSEPORT is not allowed 

1599 # on other address families than AF_INET/AF_INET6. 

1600 if reuse_port and af in (socket.AF_INET, socket.AF_INET6): 

1601 _set_reuseport(sock) 

1602 if keep_alive: 1602 ↛ 1603line 1602 didn't jump to line 1603 because the condition on line 1602 was never true

1603 sock.setsockopt( 

1604 socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) 

1605 # Disable IPv4/IPv6 dual stack support (enabled by 

1606 # default on Linux) which makes a single socket 

1607 # listen on both address families. 

1608 if (_HAS_IPv6 and 

1609 af == socket.AF_INET6 and 

1610 hasattr(socket, 'IPPROTO_IPV6')): 

1611 sock.setsockopt(socket.IPPROTO_IPV6, 

1612 socket.IPV6_V6ONLY, 

1613 True) 

1614 try: 

1615 sock.bind(sa) 

1616 except OSError as err: 

1617 msg = ('error while attempting ' 

1618 'to bind on address %r: %s' 

1619 % (sa, str(err).lower())) 

1620 if err.errno == errno.EADDRNOTAVAIL: 1620 ↛ 1622line 1620 didn't jump to line 1622 because the condition on line 1620 was never true

1621 # Assume the family is not enabled (bpo-30945) 

1622 sockets.pop() 

1623 sock.close() 

1624 if self._debug: 

1625 logger.warning(msg) 

1626 continue 

1627 raise OSError(err.errno, msg) from None 

1628 

1629 if not sockets: 1629 ↛ 1630line 1629 didn't jump to line 1630 because the condition on line 1629 was never true

1630 raise OSError('could not bind on any address out of %r' 

1631 % ([info[4] for info in infos],)) 

1632 

1633 completed = True 

1634 finally: 

1635 if not completed: 

1636 for sock in sockets: 1636 ↛ 1645line 1636 didn't jump to line 1645 because the loop on line 1636 didn't complete

1637 sock.close() 

1638 else: 

1639 if sock is None: 

1640 raise ValueError('Neither host/port nor sock were specified') 

1641 if sock.type != socket.SOCK_STREAM: 

1642 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 

1643 sockets = [sock] 

1644 

1645 for sock in sockets: 

1646 sock.setblocking(False) 

1647 

1648 server = Server(self, sockets, protocol_factory, 

1649 ssl, backlog, ssl_handshake_timeout, 

1650 ssl_shutdown_timeout) 

1651 if start_serving: 

1652 server._start_serving() 

1653 # Skip one loop iteration so that all 'loop.add_reader' 

1654 # go through. 

1655 await tasks.sleep(0) 

1656 

1657 if self._debug: 

1658 logger.info("%r is serving", server) 

1659 return server 

1660 

1661 async def connect_accepted_socket( 

1662 self, protocol_factory, sock, 

1663 *, ssl=None, 

1664 ssl_handshake_timeout=None, 

1665 ssl_shutdown_timeout=None): 

1666 if sock.type != socket.SOCK_STREAM: 1666 ↛ 1667line 1666 didn't jump to line 1667 because the condition on line 1666 was never true

1667 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 

1668 

1669 if ssl_handshake_timeout is not None and not ssl: 

1670 raise ValueError( 

1671 'ssl_handshake_timeout is only meaningful with ssl') 

1672 

1673 if ssl_shutdown_timeout is not None and not ssl: 1673 ↛ 1674line 1673 didn't jump to line 1674 because the condition on line 1673 was never true

1674 raise ValueError( 

1675 'ssl_shutdown_timeout is only meaningful with ssl') 

1676 

1677 _check_ssl_socket(sock) 

1678 

1679 transport, protocol = await self._create_connection_transport( 

1680 sock, protocol_factory, ssl, '', server_side=True, 

1681 ssl_handshake_timeout=ssl_handshake_timeout, 

1682 ssl_shutdown_timeout=ssl_shutdown_timeout) 

1683 if self._debug: 1683 ↛ 1686line 1683 didn't jump to line 1686 because the condition on line 1683 was never true

1684 # Get the socket from the transport because SSL transport closes 

1685 # the old socket and creates a new SSL socket 

1686 sock = transport.get_extra_info('socket') 

1687 logger.debug("%r handled: (%r, %r)", sock, transport, protocol) 

1688 return transport, protocol 

1689 

1690 async def connect_read_pipe(self, protocol_factory, pipe): 

1691 protocol = protocol_factory() 

1692 waiter = self.create_future() 

1693 transport = self._make_read_pipe_transport(pipe, protocol, waiter) 

1694 

1695 try: 

1696 await waiter 

1697 except: 

1698 transport.close() 

1699 raise 

1700 

1701 if self._debug: 1701 ↛ 1702line 1701 didn't jump to line 1702 because the condition on line 1701 was never true

1702 logger.debug('Read pipe %r connected: (%r, %r)', 

1703 pipe.fileno(), transport, protocol) 

1704 return transport, protocol 

1705 

1706 async def connect_write_pipe(self, protocol_factory, pipe): 

1707 protocol = protocol_factory() 

1708 waiter = self.create_future() 

1709 transport = self._make_write_pipe_transport(pipe, protocol, waiter) 

1710 

1711 try: 

1712 await waiter 

1713 except: 

1714 transport.close() 

1715 raise 

1716 

1717 if self._debug: 1717 ↛ 1718line 1717 didn't jump to line 1718 because the condition on line 1717 was never true

1718 logger.debug('Write pipe %r connected: (%r, %r)', 

1719 pipe.fileno(), transport, protocol) 

1720 return transport, protocol 

1721 

1722 def _log_subprocess(self, msg, stdin, stdout, stderr): 

1723 info = [msg] 

1724 if stdin is not None: 

1725 info.append(f'stdin={_format_pipe(stdin)}') 

1726 if stdout is not None and stderr == subprocess.STDOUT: 

1727 info.append(f'stdout=stderr={_format_pipe(stdout)}') 

1728 else: 

1729 if stdout is not None: 

1730 info.append(f'stdout={_format_pipe(stdout)}') 

1731 if stderr is not None: 

1732 info.append(f'stderr={_format_pipe(stderr)}') 

1733 logger.debug(' '.join(info)) 

1734 

1735 async def subprocess_shell(self, protocol_factory, cmd, *, 

1736 stdin=subprocess.PIPE, 

1737 stdout=subprocess.PIPE, 

1738 stderr=subprocess.PIPE, 

1739 universal_newlines=False, 

1740 shell=True, bufsize=0, 

1741 encoding=None, errors=None, text=None, 

1742 **kwargs): 

1743 if not isinstance(cmd, (bytes, str)): 

1744 raise ValueError("cmd must be a string") 

1745 if universal_newlines: 

1746 raise ValueError("universal_newlines must be False") 

1747 if not shell: 

1748 raise ValueError("shell must be True") 

1749 if bufsize != 0: 

1750 raise ValueError("bufsize must be 0") 

1751 if text: 

1752 raise ValueError("text must be False") 

1753 if encoding is not None: 

1754 raise ValueError("encoding must be None") 

1755 if errors is not None: 

1756 raise ValueError("errors must be None") 

1757 

1758 protocol = protocol_factory() 

1759 debug_log = None 

1760 if self._debug: 1760 ↛ 1763line 1760 didn't jump to line 1763 because the condition on line 1760 was never true

1761 # don't log parameters: they may contain sensitive information 

1762 # (password) and may be too long 

1763 debug_log = 'run shell command %r' % cmd 

1764 self._log_subprocess(debug_log, stdin, stdout, stderr) 

1765 transport = await self._make_subprocess_transport( 

1766 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) 

1767 if self._debug and debug_log is not None: 1767 ↛ 1768line 1767 didn't jump to line 1768 because the condition on line 1767 was never true

1768 logger.info('%s: %r', debug_log, transport) 

1769 return transport, protocol 

1770 

1771 async def subprocess_exec(self, protocol_factory, program, *args, 

1772 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 

1773 stderr=subprocess.PIPE, universal_newlines=False, 

1774 shell=False, bufsize=0, 

1775 encoding=None, errors=None, text=None, 

1776 **kwargs): 

1777 if universal_newlines: 

1778 raise ValueError("universal_newlines must be False") 

1779 if shell: 

1780 raise ValueError("shell must be False") 

1781 if bufsize != 0: 

1782 raise ValueError("bufsize must be 0") 

1783 if text: 

1784 raise ValueError("text must be False") 

1785 if encoding is not None: 

1786 raise ValueError("encoding must be None") 

1787 if errors is not None: 

1788 raise ValueError("errors must be None") 

1789 

1790 popen_args = (program,) + args 

1791 protocol = protocol_factory() 

1792 debug_log = None 

1793 if self._debug: 1793 ↛ 1796line 1793 didn't jump to line 1796 because the condition on line 1793 was never true

1794 # don't log parameters: they may contain sensitive information 

1795 # (password) and may be too long 

1796 debug_log = f'execute program {program!r}' 

1797 self._log_subprocess(debug_log, stdin, stdout, stderr) 

1798 transport = await self._make_subprocess_transport( 

1799 protocol, popen_args, False, stdin, stdout, stderr, 

1800 bufsize, **kwargs) 

1801 if self._debug and debug_log is not None: 1801 ↛ 1802line 1801 didn't jump to line 1802 because the condition on line 1801 was never true

1802 logger.info('%s: %r', debug_log, transport) 

1803 return transport, protocol 

1804 

1805 def get_exception_handler(self): 

1806 """Return an exception handler, or None if the default one is in use. 

1807 """ 

1808 return self._exception_handler 

1809 

1810 def set_exception_handler(self, handler): 

1811 """Set handler as the new event loop exception handler. 

1812 

1813 If handler is None, the default exception handler will 

1814 be set. 

1815 

1816 If handler is a callable object, it should have a 

1817 signature matching '(loop, context)', where 'loop' 

1818 will be a reference to the active event loop, 'context' 

1819 will be a dict object (see `call_exception_handler()` 

1820 documentation for details about context). 

1821 """ 

1822 if handler is not None and not callable(handler): 

1823 raise TypeError(f'A callable object or None is expected, ' 

1824 f'got {handler!r}') 

1825 self._exception_handler = handler 

1826 

1827 def default_exception_handler(self, context): 

1828 """Default exception handler. 

1829 

1830 This is called when an exception occurs and no exception 

1831 handler is set, and can be called by a custom exception 

1832 handler that wants to defer to the default behavior. 

1833 

1834 This default handler logs the error message and other 

1835 context-dependent information. In debug mode, a truncated 

1836 stack trace is also appended showing where the given object 

1837 (e.g. a handle or future or task) was created, if any. 

1838 

1839 The context parameter has the same meaning as in 

1840 `call_exception_handler()`. 

1841 """ 

1842 message = context.get('message') 

1843 if not message: 1843 ↛ 1844line 1843 didn't jump to line 1844 because the condition on line 1843 was never true

1844 message = 'Unhandled exception in event loop' 

1845 

1846 exception = context.get('exception') 

1847 if exception is not None: 

1848 exc_info = (type(exception), exception, exception.__traceback__) 

1849 else: 

1850 exc_info = False 

1851 

1852 if ('source_traceback' not in context and 1852 ↛ 1855line 1852 didn't jump to line 1855 because the condition on line 1852 was never true

1853 self._current_handle is not None and 

1854 self._current_handle._source_traceback): 

1855 context['handle_traceback'] = \ 

1856 self._current_handle._source_traceback 

1857 

1858 log_lines = [message] 

1859 for key in sorted(context): 

1860 if key in {'message', 'exception'}: 

1861 continue 

1862 value = context[key] 

1863 if key == 'source_traceback': 

1864 tb = ''.join(traceback.format_list(value)) 

1865 value = 'Object created at (most recent call last):\n' 

1866 value += tb.rstrip() 

1867 elif key == 'handle_traceback': 1867 ↛ 1868line 1867 didn't jump to line 1868 because the condition on line 1867 was never true

1868 tb = ''.join(traceback.format_list(value)) 

1869 value = 'Handle created at (most recent call last):\n' 

1870 value += tb.rstrip() 

1871 else: 

1872 value = repr(value) 

1873 log_lines.append(f'{key}: {value}') 

1874 

1875 logger.error('\n'.join(log_lines), exc_info=exc_info) 

1876 

1877 def call_exception_handler(self, context): 

1878 """Call the current event loop's exception handler. 

1879 

1880 The context argument is a dict containing the following keys: 

1881 

1882 - 'message': Error message; 

1883 - 'exception' (optional): Exception object; 

1884 - 'future' (optional): Future instance; 

1885 - 'task' (optional): Task instance; 

1886 - 'handle' (optional): Handle instance; 

1887 - 'protocol' (optional): Protocol instance; 

1888 - 'transport' (optional): Transport instance; 

1889 - 'socket' (optional): Socket instance; 

1890 - 'source_traceback' (optional): Traceback of the source; 

1891 - 'handle_traceback' (optional): Traceback of the handle; 

1892 - 'asyncgen' (optional): Asynchronous generator that caused 

1893 the exception. 

1894 

1895 New keys maybe introduced in the future. 

1896 

1897 Note: do not overload this method in an event loop subclass. 

1898 For custom exception handling, use the 

1899 `set_exception_handler()` method. 

1900 """ 

1901 if self._exception_handler is None: 

1902 try: 

1903 self.default_exception_handler(context) 

1904 except (SystemExit, KeyboardInterrupt): 

1905 raise 

1906 except BaseException: 

1907 # Second protection layer for unexpected errors 

1908 # in the default implementation, as well as for subclassed 

1909 # event loops with overloaded "default_exception_handler". 

1910 logger.error('Exception in default exception handler', 

1911 exc_info=True) 

1912 else: 

1913 try: 

1914 ctx = None 

1915 thing = context.get("task") 

1916 if thing is None: 

1917 # Even though Futures don't have a context, 

1918 # Task is a subclass of Future, 

1919 # and sometimes the 'future' key holds a Task. 

1920 thing = context.get("future") 

1921 if thing is None: 

1922 # Handles also have a context. 

1923 thing = context.get("handle") 

1924 if thing is not None and hasattr(thing, "get_context"): 

1925 ctx = thing.get_context() 

1926 if ctx is not None and hasattr(ctx, "run"): 

1927 ctx.run(self._exception_handler, self, context) 

1928 else: 

1929 self._exception_handler(self, context) 

1930 except (SystemExit, KeyboardInterrupt): 

1931 raise 

1932 except BaseException as exc: 

1933 # Exception in the user set custom exception handler. 

1934 try: 

1935 # Let's try default handler. 

1936 self.default_exception_handler({ 

1937 'message': 'Unhandled error in exception handler', 

1938 'exception': exc, 

1939 'context': context, 

1940 }) 

1941 except (SystemExit, KeyboardInterrupt): 

1942 raise 

1943 except BaseException: 

1944 # Guard 'default_exception_handler' in case it is 

1945 # overloaded. 

1946 logger.error('Exception in default exception handler ' 

1947 'while handling an unexpected error ' 

1948 'in custom exception handler', 

1949 exc_info=True) 

1950 

1951 def _add_callback(self, handle): 

1952 """Add a Handle to _ready.""" 

1953 if not handle._cancelled: 

1954 self._ready.append(handle) 

1955 

1956 def _add_callback_signalsafe(self, handle): 

1957 """Like _add_callback() but called from a signal handler.""" 

1958 self._add_callback(handle) 

1959 self._write_to_self() 

1960 

1961 def _timer_handle_cancelled(self, handle): 

1962 """Notification that a TimerHandle has been cancelled.""" 

1963 if handle._scheduled: 

1964 self._timer_cancelled_count += 1 

1965 

1966 def _run_once(self): 

1967 """Run one full iteration of the event loop. 

1968 

1969 This calls all currently ready callbacks, polls for I/O, 

1970 schedules the resulting callbacks, and finally schedules 

1971 'call_later' callbacks. 

1972 """ 

1973 

1974 sched_count = len(self._scheduled) 

1975 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and 

1976 self._timer_cancelled_count / sched_count > 

1977 _MIN_CANCELLED_TIMER_HANDLES_FRACTION): 

1978 # Remove delayed calls that were cancelled if their number 

1979 # is too high 

1980 new_scheduled = [] 

1981 for handle in self._scheduled: 

1982 if handle._cancelled: 

1983 handle._scheduled = False 

1984 else: 

1985 new_scheduled.append(handle) 

1986 

1987 heapq.heapify(new_scheduled) 

1988 self._scheduled = new_scheduled 

1989 self._timer_cancelled_count = 0 

1990 else: 

1991 # Remove delayed calls that were cancelled from head of queue. 

1992 while self._scheduled and self._scheduled[0]._cancelled: 

1993 self._timer_cancelled_count -= 1 

1994 handle = heapq.heappop(self._scheduled) 

1995 handle._scheduled = False 

1996 

1997 timeout = None 

1998 if self._ready or self._stopping: 

1999 timeout = 0 

2000 elif self._scheduled: 

2001 # Compute the desired timeout. 

2002 timeout = self._scheduled[0]._when - self.time() 

2003 if timeout > MAXIMUM_SELECT_TIMEOUT: 2003 ↛ 2004line 2003 didn't jump to line 2004 because the condition on line 2003 was never true

2004 timeout = MAXIMUM_SELECT_TIMEOUT 

2005 elif timeout < 0: 

2006 timeout = 0 

2007 

2008 event_list = self._selector.select(timeout) 

2009 self._process_events(event_list) 

2010 # Needed to break cycles when an exception occurs. 

2011 event_list = None 

2012 

2013 # Handle 'later' callbacks that are ready. 

2014 end_time = self.time() + self._clock_resolution 

2015 while self._scheduled: 

2016 handle = self._scheduled[0] 

2017 if handle._when >= end_time: 

2018 break 

2019 handle = heapq.heappop(self._scheduled) 

2020 handle._scheduled = False 

2021 self._ready.append(handle) 

2022 

2023 # This is the only place where callbacks are actually *called*. 

2024 # All other places just add them to ready. 

2025 # Note: We run all currently scheduled callbacks, but not any 

2026 # callbacks scheduled by callbacks run this time around -- 

2027 # they will be run the next time (after another I/O poll). 

2028 # Use an idiom that is thread-safe without using locks. 

2029 ntodo = len(self._ready) 

2030 for i in range(ntodo): 

2031 handle = self._ready.popleft() 

2032 if handle._cancelled: 

2033 continue 

2034 if self._debug: 

2035 try: 

2036 self._current_handle = handle 

2037 t0 = self.time() 

2038 handle._run() 

2039 dt = self.time() - t0 

2040 if dt >= self.slow_callback_duration: 

2041 logger.warning('Executing %s took %.3f seconds', 

2042 _format_handle(handle), dt) 

2043 finally: 

2044 self._current_handle = None 

2045 else: 

2046 handle._run() 

2047 handle = None # Needed to break cycles when an exception occurs. 

2048 

2049 def _set_coroutine_origin_tracking(self, enabled): 

2050 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled): 

2051 return 

2052 

2053 if enabled: 

2054 self._coroutine_origin_tracking_saved_depth = ( 

2055 sys.get_coroutine_origin_tracking_depth()) 

2056 sys.set_coroutine_origin_tracking_depth( 

2057 constants.DEBUG_STACK_DEPTH) 

2058 else: 

2059 sys.set_coroutine_origin_tracking_depth( 

2060 self._coroutine_origin_tracking_saved_depth) 

2061 

2062 self._coroutine_origin_tracking_enabled = enabled 

2063 

2064 def get_debug(self): 

2065 return self._debug 

2066 

2067 def set_debug(self, enabled): 

2068 self._debug = enabled 

2069 

2070 if self.is_running(): 

2071 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)