Coverage for Lib/asyncio/base_events.py: 88%

1168 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-24 03:28 +0000

1"""Base implementation of event loop. 

2 

3The event loop can be broken up into a multiplexer (the part 

4responsible for notifying us of I/O events) and the event loop proper, 

5which wraps a multiplexer with functionality for scheduling callbacks, 

6immediately or at a given time in the future. 

7 

8Whenever a public API takes a callback, subsequent positional 

9arguments will be passed to the callback if/when it is called. This 

10avoids the proliferation of trivial lambdas implementing closures. 

11Keyword arguments for the callback are not supported; this is a 

12conscious design decision, leaving the door open for keyword arguments 

13to modify the meaning of the API call itself. 

14""" 

15 

16import collections 

17import contextvars 

18import collections.abc 

19import concurrent.futures 

20import errno 

21import heapq 

22import itertools 

23import math 

24import os 

25import socket 

26import stat 

27import subprocess 

28import sys 

29import threading 

30import time 

31import traceback 

32import warnings 

33import weakref 

34import inspect 

35 

36try: 

37 import ssl 

38except ImportError: # pragma: no cover 

39 ssl = None 

40 

41from . import constants 

42from . import coroutines 

43from . import events 

44from . import exceptions 

45from . import futures 

46from . import protocols 

47from . import sslproto 

48from . import staggered 

49from . import tasks 

50from . import timeouts 

51from . import transports 

52from . import trsock 

53from .log import logger 

54 

55 

56__all__ = 'BaseEventLoop','Server', 

57 

58 

59# Minimum number of _scheduled timer handles before cleanup of 

60# cancelled handles is performed. 

61_MIN_SCHEDULED_TIMER_HANDLES = 100 

62 

63# Minimum fraction of _scheduled timer handles that are cancelled 

64# before cleanup of cancelled handles is performed. 

65_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 

66 

67 

68_HAS_IPv6 = hasattr(socket, 'AF_INET6') 

69 

70# Maximum timeout passed to select to avoid OS limitations 

71MAXIMUM_SELECT_TIMEOUT = 24 * 3600 

72 

73 

74def _format_handle(handle): 

75 cb = handle._callback 

76 if isinstance(getattr(cb, '__self__', None), tasks.Task): 

77 # format the task 

78 return repr(cb.__self__) 

79 else: 

80 return str(handle) 

81 

82 

83def _format_pipe(fd): 

84 if fd == subprocess.PIPE: 

85 return '<pipe>' 

86 elif fd == subprocess.STDOUT: 

87 return '<stdout>' 

88 else: 

89 return repr(fd) 

90 

91 

92def _set_reuseport(sock): 

93 if not hasattr(socket, 'SO_REUSEPORT'): 

94 raise ValueError('reuse_port not supported by socket module') 

95 else: 

96 try: 

97 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 

98 except OSError: 

99 raise ValueError('reuse_port not supported by socket module, ' 

100 'SO_REUSEPORT defined but not implemented.') 

101 

102 

103def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0): 

104 # Try to skip getaddrinfo if "host" is already an IP. Users might have 

105 # handled name resolution in their own code and pass in resolved IPs. 

106 if not hasattr(socket, 'inet_pton'): 

107 return 

108 

109 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \ 

110 host is None: 

111 return None 

112 

113 if type == socket.SOCK_STREAM: 

114 proto = socket.IPPROTO_TCP 

115 elif type == socket.SOCK_DGRAM: 

116 proto = socket.IPPROTO_UDP 

117 else: 

118 return None 

119 

120 if port is None: 

121 port = 0 

122 elif isinstance(port, bytes) and port == b'': 

123 port = 0 

124 elif isinstance(port, str) and port == '': 

125 port = 0 

126 else: 

127 # If port's a service name like "http", don't skip getaddrinfo. 

128 try: 

129 port = int(port) 

130 except (TypeError, ValueError): 

131 return None 

132 

133 if family == socket.AF_UNSPEC: 

134 afs = [socket.AF_INET] 

135 if _HAS_IPv6: 135 ↛ 140line 135 didn't jump to line 140 because the condition on line 135 was always true

136 afs.append(socket.AF_INET6) 

137 else: 

138 afs = [family] 

139 

140 if isinstance(host, bytes): 

141 host = host.decode('idna') 

142 if '%' in host: 

143 # Linux's inet_pton doesn't accept an IPv6 zone index after host, 

144 # like '::1%lo0'. 

145 return None 

146 

147 for af in afs: 

148 try: 

149 socket.inet_pton(af, host) 

150 # The host has already been resolved. 

151 if _HAS_IPv6 and af == socket.AF_INET6: 

152 return af, type, proto, '', (host, port, flowinfo, scopeid) 

153 else: 

154 return af, type, proto, '', (host, port) 

155 except OSError: 

156 pass 

157 

158 # "host" is not an IP address. 

159 return None 

160 

161 

162def _interleave_addrinfos(addrinfos, first_address_family_count=1): 

163 """Interleave list of addrinfo tuples by family.""" 

164 # Group addresses by family 

165 addrinfos_by_family = collections.OrderedDict() 

166 for addr in addrinfos: 

167 family = addr[0] 

168 if family not in addrinfos_by_family: 

169 addrinfos_by_family[family] = [] 

170 addrinfos_by_family[family].append(addr) 

171 addrinfos_lists = list(addrinfos_by_family.values()) 

172 

173 reordered = [] 

174 if first_address_family_count > 1: 

175 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1]) 

176 del addrinfos_lists[0][:first_address_family_count - 1] 

177 reordered.extend( 

178 a for a in itertools.chain.from_iterable( 

179 itertools.zip_longest(*addrinfos_lists) 

180 ) if a is not None) 

181 return reordered 

182 

183 

184def _run_until_complete_cb(fut): 

185 if not fut.cancelled(): 

186 exc = fut.exception() 

187 if isinstance(exc, (SystemExit, KeyboardInterrupt)): 

188 # Issue #22429: run_forever() already finished, no need to 

189 # stop it. 

190 return 

191 futures._get_loop(fut).stop() 

192 

193 

194if hasattr(socket, 'TCP_NODELAY'): 194 ↛ 201line 194 didn't jump to line 201 because the condition on line 194 was always true

195 def _set_nodelay(sock): 

196 if (sock.family in {socket.AF_INET, socket.AF_INET6} and 

197 sock.type == socket.SOCK_STREAM and 

198 sock.proto == socket.IPPROTO_TCP): 

199 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 

200else: 

201 def _set_nodelay(sock): 

202 pass 

203 

204 

205def _check_ssl_socket(sock): 

206 if ssl is not None and isinstance(sock, ssl.SSLSocket): 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true

207 raise TypeError("Socket cannot be of type SSLSocket") 

208 

209 

210class _SendfileFallbackProtocol(protocols.Protocol): 

211 def __init__(self, transp): 

212 if not isinstance(transp, transports._FlowControlMixin): 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true

213 raise TypeError("transport should be _FlowControlMixin instance") 

214 self._transport = transp 

215 self._proto = transp.get_protocol() 

216 self._should_resume_reading = transp.is_reading() 

217 self._should_resume_writing = transp._protocol_paused 

218 transp.pause_reading() 

219 transp.set_protocol(self) 

220 if self._should_resume_writing: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true

221 self._write_ready_fut = self._transport._loop.create_future() 

222 else: 

223 self._write_ready_fut = None 

224 

225 async def drain(self): 

226 if self._transport.is_closing(): 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true

227 raise ConnectionError("Connection closed by peer") 

228 fut = self._write_ready_fut 

229 if fut is None: 

230 return 

231 await fut 

232 

233 def connection_made(self, transport): 

234 raise RuntimeError("Invalid state: " 

235 "connection should have been established already.") 

236 

237 def connection_lost(self, exc): 

238 if self._write_ready_fut is not None: 238 ↛ 246line 238 didn't jump to line 246 because the condition on line 238 was always true

239 # Never happens if peer disconnects after sending the whole content 

240 # Thus disconnection is always an exception from user perspective 

241 if exc is None: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true

242 self._write_ready_fut.set_exception( 

243 ConnectionError("Connection is closed by peer")) 

244 else: 

245 self._write_ready_fut.set_exception(exc) 

246 self._proto.connection_lost(exc) 

247 

248 def pause_writing(self): 

249 if self._write_ready_fut is not None: 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true

250 return 

251 self._write_ready_fut = self._transport._loop.create_future() 

252 

253 def resume_writing(self): 

254 if self._write_ready_fut is None: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true

255 return 

256 self._write_ready_fut.set_result(False) 

257 self._write_ready_fut = None 

258 

259 def data_received(self, data): 

260 raise RuntimeError("Invalid state: reading should be paused") 

261 

262 def eof_received(self): 

263 raise RuntimeError("Invalid state: reading should be paused") 

264 

265 async def restore(self): 

266 self._transport.set_protocol(self._proto) 

267 if self._should_resume_reading: 267 ↛ 269line 267 didn't jump to line 269 because the condition on line 267 was always true

268 self._transport.resume_reading() 

269 if self._write_ready_fut is not None: 

270 # Cancel the future. 

271 # Basically it has no effect because protocol is switched back, 

272 # no code should wait for it anymore. 

273 self._write_ready_fut.cancel() 

274 if self._should_resume_writing: 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true

275 self._proto.resume_writing() 

276 

277 

278class Server(events.AbstractServer): 

279 

280 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, 

281 ssl_handshake_timeout, ssl_shutdown_timeout=None): 

282 self._loop = loop 

283 self._sockets = sockets 

284 # Weak references so we don't break Transport's ability to 

285 # detect abandoned transports 

286 self._clients = weakref.WeakSet() 

287 self._waiters = [] 

288 self._protocol_factory = protocol_factory 

289 self._backlog = backlog 

290 self._ssl_context = ssl_context 

291 self._ssl_handshake_timeout = ssl_handshake_timeout 

292 self._ssl_shutdown_timeout = ssl_shutdown_timeout 

293 self._serving = False 

294 self._serving_forever_fut = None 

295 self._context = contextvars.copy_context() 

296 

297 def __repr__(self): 

298 return f'<{self.__class__.__name__} sockets={self.sockets!r}>' 

299 

300 def _attach(self, transport): 

301 assert self._sockets is not None 

302 self._clients.add(transport) 

303 

304 def _detach(self, transport): 

305 self._clients.discard(transport) 

306 if len(self._clients) == 0 and self._sockets is None: 

307 self._wakeup() 

308 

309 def _wakeup(self): 

310 waiters = self._waiters 

311 self._waiters = None 

312 for waiter in waiters: 

313 if not waiter.done(): 313 ↛ 312line 313 didn't jump to line 312 because the condition on line 313 was always true

314 waiter.set_result(None) 

315 

316 def _start_serving(self): 

317 if self._serving: 

318 return 

319 self._serving = True 

320 for sock in self._sockets: 

321 sock.listen(self._backlog) 

322 self._loop._start_serving( 

323 self._protocol_factory, sock, self._ssl_context, 

324 self, self._backlog, self._ssl_handshake_timeout, 

325 self._ssl_shutdown_timeout, context=self._context) 

326 

327 def get_loop(self): 

328 return self._loop 

329 

330 def is_serving(self): 

331 return self._serving 

332 

333 @property 

334 def sockets(self): 

335 if self._sockets is None: 

336 return () 

337 return tuple(trsock.TransportSocket(s) for s in self._sockets) 

338 

339 def close(self): 

340 sockets = self._sockets 

341 if sockets is None: 

342 return 

343 self._sockets = None 

344 

345 for sock in sockets: 

346 self._loop._stop_serving(sock) 

347 

348 self._serving = False 

349 

350 if (self._serving_forever_fut is not None and 350 ↛ 352line 350 didn't jump to line 352 because the condition on line 350 was never true

351 not self._serving_forever_fut.done()): 

352 self._serving_forever_fut.cancel() 

353 self._serving_forever_fut = None 

354 

355 if len(self._clients) == 0: 

356 self._wakeup() 

357 

358 def close_clients(self): 

359 for transport in self._clients.copy(): 

360 transport.close() 

361 

362 def abort_clients(self): 

363 for transport in self._clients.copy(): 

364 transport.abort() 

365 

366 async def start_serving(self): 

367 self._start_serving() 

368 # Skip one loop iteration so that all 'loop.add_reader' 

369 # go through. 

370 await tasks.sleep(0) 

371 

372 async def serve_forever(self): 

373 if self._serving_forever_fut is not None: 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true

374 raise RuntimeError( 

375 f'server {self!r} is already being awaited on serve_forever()') 

376 if self._sockets is None: 

377 raise RuntimeError(f'server {self!r} is closed') 

378 

379 self._start_serving() 

380 self._serving_forever_fut = self._loop.create_future() 

381 

382 try: 

383 await self._serving_forever_fut 

384 except exceptions.CancelledError: 

385 try: 

386 self.close() 

387 self.close_clients() 

388 await self.wait_closed() 

389 finally: 

390 raise 

391 finally: 

392 self._serving_forever_fut = None 

393 

394 async def wait_closed(self): 

395 """Wait until server is closed and all connections are dropped. 

396 

397 - If the server is not closed, wait. 

398 - If it is closed, but there are still active connections, wait. 

399 

400 Anyone waiting here will be unblocked once both conditions 

401 (server is closed and all connections have been dropped) 

402 have become true, in either order. 

403 

404 Historical note: In 3.11 and before, this was broken, returning 

405 immediately if the server was already closed, even if there 

406 were still active connections. An attempted fix in 3.12.0 was 

407 still broken, returning immediately if the server was still 

408 open and there were no active connections. Hopefully in 3.12.1 

409 we have it right. 

410 """ 

411 # Waiters are unblocked by self._wakeup(), which is called 

412 # from two places: self.close() and self._detach(), but only 

413 # when both conditions have become true. To signal that this 

414 # has happened, self._wakeup() sets self._waiters to None. 

415 if self._waiters is None: 

416 return 

417 waiter = self._loop.create_future() 

418 self._waiters.append(waiter) 

419 await waiter 

420 

421 

422class BaseEventLoop(events.AbstractEventLoop): 

423 

424 def __init__(self): 

425 self._timer_cancelled_count = 0 

426 self._closed = False 

427 self._stopping = False 

428 self._ready = collections.deque() 

429 self._scheduled = [] 

430 self._default_executor = None 

431 self._internal_fds = 0 

432 # Identifier of the thread running the event loop, or None if the 

433 # event loop is not running 

434 self._thread_id = None 

435 self._clock_resolution = time.get_clock_info('monotonic').resolution 

436 self._exception_handler = None 

437 self.set_debug(coroutines._is_debug_mode()) 

438 # The preserved state of async generator hooks. 

439 self._old_agen_hooks = None 

440 # In debug mode, if the execution of a callback or a step of a task 

441 # exceed this duration in seconds, the slow callback/task is logged. 

442 self.slow_callback_duration = 0.1 

443 self._current_handle = None 

444 self._task_factory = None 

445 self._coroutine_origin_tracking_enabled = False 

446 self._coroutine_origin_tracking_saved_depth = None 

447 

448 # A weak set of all asynchronous generators that are 

449 # being iterated by the loop. 

450 self._asyncgens = weakref.WeakSet() 

451 # Set to True when `loop.shutdown_asyncgens` is called. 

452 self._asyncgens_shutdown_called = False 

453 # Set to True when `loop.shutdown_default_executor` is called. 

454 self._executor_shutdown_called = False 

455 

456 def __repr__(self): 

457 return ( 

458 f'<{self.__class__.__name__} running={self.is_running()} ' 

459 f'closed={self.is_closed()} debug={self.get_debug()}>' 

460 ) 

461 

462 def create_future(self): 

463 """Create a Future object attached to the loop.""" 

464 return futures.Future(loop=self) 

465 

466 def create_task(self, coro, **kwargs): 

467 """Schedule or begin executing a coroutine object. 

468 

469 Return a task object. 

470 """ 

471 self._check_closed() 

472 if self._task_factory is not None: 

473 return self._task_factory(self, coro, **kwargs) 

474 

475 task = tasks.Task(coro, loop=self, **kwargs) 

476 if task._source_traceback: 

477 del task._source_traceback[-1] 

478 try: 

479 return task 

480 finally: 

481 # gh-128552: prevent a refcycle of 

482 # task.exception().__traceback__->BaseEventLoop.create_task->task 

483 del task 

484 

485 def set_task_factory(self, factory): 

486 """Set a task factory that will be used by loop.create_task(). 

487 

488 If factory is None the default task factory will be set. 

489 

490 If factory is a callable, it should have a signature matching 

491 '(loop, coro, **kwargs)', where 'loop' will be a reference to the 

492 active event loop, 'coro' will be a coroutine object, and **kwargs 

493 will be arbitrary keyword arguments that should be passed on to 

494 Task. The callable must return a Task. 

495 """ 

496 if factory is not None and not callable(factory): 

497 raise TypeError('task factory must be a callable or None') 

498 self._task_factory = factory 

499 

500 def get_task_factory(self): 

501 """Return a task factory, or None if the default one is in use.""" 

502 return self._task_factory 

503 

504 def _make_socket_transport(self, sock, protocol, waiter=None, *, 

505 extra=None, server=None): 

506 """Create socket transport.""" 

507 raise NotImplementedError 

508 

509 def _make_ssl_transport( 

510 self, rawsock, protocol, sslcontext, waiter=None, 

511 *, server_side=False, server_hostname=None, 

512 extra=None, server=None, 

513 ssl_handshake_timeout=None, 

514 ssl_shutdown_timeout=None, 

515 call_connection_made=True, 

516 context=None): 

517 """Create SSL transport.""" 

518 raise NotImplementedError 

519 

520 def _make_datagram_transport(self, sock, protocol, 

521 address=None, waiter=None, extra=None): 

522 """Create datagram transport.""" 

523 raise NotImplementedError 

524 

525 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 

526 extra=None): 

527 """Create read pipe transport.""" 

528 raise NotImplementedError 

529 

530 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 

531 extra=None): 

532 """Create write pipe transport.""" 

533 raise NotImplementedError 

534 

535 async def _make_subprocess_transport(self, protocol, args, shell, 

536 stdin, stdout, stderr, bufsize, 

537 extra=None, **kwargs): 

538 """Create subprocess transport.""" 

539 raise NotImplementedError 

540 

541 def _write_to_self(self): 

542 """Write a byte to self-pipe, to wake up the event loop. 

543 

544 This may be called from a different thread. 

545 

546 The subclass is responsible for implementing the self-pipe. 

547 """ 

548 raise NotImplementedError 

549 

550 def _process_events(self, event_list): 

551 """Process selector events.""" 

552 raise NotImplementedError 

553 

554 def _check_closed(self): 

555 if self._closed: 

556 raise RuntimeError('Event loop is closed') 

557 

558 def _check_default_executor(self): 

559 if self._executor_shutdown_called: 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true

560 raise RuntimeError('Executor shutdown has been called') 

561 

562 def _asyncgen_finalizer_hook(self, agen): 

563 self._asyncgens.discard(agen) 

564 if not self.is_closed(): 564 ↛ exitline 564 didn't return from function '_asyncgen_finalizer_hook' because the condition on line 564 was always true

565 self.call_soon_threadsafe(self.create_task, agen.aclose()) 

566 

567 def _asyncgen_firstiter_hook(self, agen): 

568 if self._asyncgens_shutdown_called: 568 ↛ 569line 568 didn't jump to line 569 because the condition on line 568 was never true

569 warnings.warn( 

570 f"asynchronous generator {agen!r} was scheduled after " 

571 f"loop.shutdown_asyncgens() call", 

572 ResourceWarning, source=self) 

573 

574 self._asyncgens.add(agen) 

575 

576 async def shutdown_asyncgens(self): 

577 """Shutdown all active asynchronous generators.""" 

578 self._asyncgens_shutdown_called = True 

579 

580 if not len(self._asyncgens): 580 ↛ 585line 580 didn't jump to line 585 because the condition on line 580 was always true

581 # If Python version is <3.6 or we don't have any asynchronous 

582 # generators alive. 

583 return 

584 

585 closing_agens = list(self._asyncgens) 

586 self._asyncgens.clear() 

587 

588 results = await tasks.gather( 

589 *[ag.aclose() for ag in closing_agens], 

590 return_exceptions=True) 

591 

592 for result, agen in zip(results, closing_agens): 

593 if isinstance(result, Exception): 

594 self.call_exception_handler({ 

595 'message': f'an error occurred during closing of ' 

596 f'asynchronous generator {agen!r}', 

597 'exception': result, 

598 'asyncgen': agen 

599 }) 

600 

601 async def shutdown_default_executor(self, timeout=None): 

602 """Schedule the shutdown of the default executor. 

603 

604 The timeout parameter specifies the amount of time the executor will 

605 be given to finish joining. The default value is None, which means 

606 that the executor will be given an unlimited amount of time. 

607 """ 

608 self._executor_shutdown_called = True 

609 if self._default_executor is None: 

610 return 

611 future = self.create_future() 

612 thread = threading.Thread(target=self._do_shutdown, args=(future,)) 

613 thread.start() 

614 try: 

615 async with timeouts.timeout(timeout): 

616 await future 

617 except TimeoutError: 

618 warnings.warn("The executor did not finishing joining " 

619 f"its threads within {timeout} seconds.", 

620 RuntimeWarning, stacklevel=2) 

621 self._default_executor.shutdown(wait=False) 

622 else: 

623 thread.join() 

624 

625 def _do_shutdown(self, future): 

626 try: 

627 self._default_executor.shutdown(wait=True) 

628 if not self.is_closed(): 628 ↛ exitline 628 didn't return from function '_do_shutdown' because the condition on line 628 was always true

629 self.call_soon_threadsafe(futures._set_result_unless_cancelled, 

630 future, None) 

631 except Exception as ex: 

632 if not self.is_closed() and not future.cancelled(): 

633 self.call_soon_threadsafe(future.set_exception, ex) 

634 

635 def _check_running(self): 

636 if self.is_running(): 

637 raise RuntimeError('This event loop is already running') 

638 if events._get_running_loop() is not None: 

639 raise RuntimeError( 

640 'Cannot run the event loop while another loop is running') 

641 

642 def _run_forever_setup(self): 

643 """Prepare the run loop to process events. 

644 

645 This method exists so that custom event loop subclasses (e.g., event loops 

646 that integrate a GUI event loop with Python's event loop) have access to all the 

647 loop setup logic. 

648 """ 

649 self._check_closed() 

650 self._check_running() 

651 self._set_coroutine_origin_tracking(self._debug) 

652 

653 self._old_agen_hooks = sys.get_asyncgen_hooks() 

654 self._thread_id = threading.get_ident() 

655 sys.set_asyncgen_hooks( 

656 firstiter=self._asyncgen_firstiter_hook, 

657 finalizer=self._asyncgen_finalizer_hook 

658 ) 

659 

660 events._set_running_loop(self) 

661 

662 def _run_forever_cleanup(self): 

663 """Clean up after an event loop finishes the looping over events. 

664 

665 This method exists so that custom event loop subclasses (e.g., event loops 

666 that integrate a GUI event loop with Python's event loop) have access to all the 

667 loop cleanup logic. 

668 """ 

669 self._stopping = False 

670 self._thread_id = None 

671 events._set_running_loop(None) 

672 self._set_coroutine_origin_tracking(False) 

673 # Restore any pre-existing async generator hooks. 

674 if self._old_agen_hooks is not None: 674 ↛ exitline 674 didn't return from function '_run_forever_cleanup' because the condition on line 674 was always true

675 sys.set_asyncgen_hooks(*self._old_agen_hooks) 

676 self._old_agen_hooks = None 

677 

678 def run_forever(self): 

679 """Run until stop() is called.""" 

680 self._run_forever_setup() 

681 try: 

682 while True: 

683 self._run_once() 

684 if self._stopping: 

685 break 

686 finally: 

687 self._run_forever_cleanup() 

688 

689 def run_until_complete(self, future): 

690 """Run until the Future is done. 

691 

692 If the argument is a coroutine, it is wrapped in a Task. 

693 

694 WARNING: It would be disastrous to call run_until_complete() 

695 with the same coroutine twice -- it would wrap it in two 

696 different Tasks and that can't be good. 

697 

698 Return the Future's result, or raise its exception. 

699 """ 

700 self._check_closed() 

701 self._check_running() 

702 

703 new_task = not futures.isfuture(future) 

704 future = tasks.ensure_future(future, loop=self) 

705 if new_task: 

706 # An exception is raised if the future didn't complete, so there 

707 # is no need to log the "destroy pending task" message 

708 future._log_destroy_pending = False 

709 

710 future.add_done_callback(_run_until_complete_cb) 

711 try: 

712 self.run_forever() 

713 except: 

714 if new_task and future.done() and not future.cancelled(): 

715 # The coroutine raised a BaseException. Consume the exception 

716 # to not log a warning, the caller doesn't have access to the 

717 # local task. 

718 future.exception() 

719 raise 

720 finally: 

721 future.remove_done_callback(_run_until_complete_cb) 

722 if not future.done(): 

723 raise RuntimeError('Event loop stopped before Future completed.') 

724 

725 return future.result() 

726 

727 def stop(self): 

728 """Stop running the event loop. 

729 

730 Every callback already scheduled will still run. This simply 

731 informs run_forever to stop looping after a complete iteration. 

732 """ 

733 self._stopping = True 

734 

735 def close(self): 

736 """Close the event loop. 

737 

738 This clears the queues and shuts down the executor, 

739 but does not wait for the executor to finish. 

740 

741 The event loop must not be running. 

742 """ 

743 if self.is_running(): 743 ↛ 744line 743 didn't jump to line 744 because the condition on line 743 was never true

744 raise RuntimeError("Cannot close a running event loop") 

745 if self._closed: 

746 return 

747 if self._debug: 

748 logger.debug("Close %r", self) 

749 self._closed = True 

750 self._ready.clear() 

751 self._scheduled.clear() 

752 self._executor_shutdown_called = True 

753 executor = self._default_executor 

754 if executor is not None: 

755 self._default_executor = None 

756 executor.shutdown(wait=False) 

757 

758 def is_closed(self): 

759 """Returns True if the event loop was closed.""" 

760 return self._closed 

761 

762 def __del__(self, _warn=warnings.warn): 

763 if not self.is_closed(): 763 ↛ 764line 763 didn't jump to line 764 because the condition on line 763 was never true

764 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self) 

765 if not self.is_running(): 

766 self.close() 

767 

768 def is_running(self): 

769 """Returns True if the event loop is running.""" 

770 return (self._thread_id is not None) 

771 

772 def time(self): 

773 """Return the time according to the event loop's clock. 

774 

775 This is a float expressed in seconds since an epoch, but the 

776 epoch, precision, accuracy and drift are unspecified and may 

777 differ per event loop. 

778 """ 

779 return time.monotonic() 

780 

781 def call_later(self, delay, callback, *args, context=None): 

782 """Arrange for a callback to be called at a given time. 

783 

784 Return a Handle: an opaque object with a cancel() method that 

785 can be used to cancel the call. 

786 

787 The delay can be an int or float, expressed in seconds. It is 

788 always relative to the current time. 

789 

790 Each callback will be called exactly once. If two callbacks 

791 are scheduled for exactly the same time, it is undefined which 

792 will be called first. 

793 

794 Any positional arguments after the callback will be passed to 

795 the callback when it is called. 

796 """ 

797 if delay is None: 

798 raise TypeError('delay must not be None') 

799 timer = self.call_at(self.time() + delay, callback, *args, 

800 context=context) 

801 if timer._source_traceback: 

802 del timer._source_traceback[-1] 

803 return timer 

804 

805 def call_at(self, when, callback, *args, context=None): 

806 """Like call_later(), but uses an absolute time. 

807 

808 Absolute time corresponds to the event loop's time() method. 

809 """ 

810 if when is None: 

811 raise TypeError("when cannot be None") 

812 self._check_closed() 

813 if self._debug: 

814 self._check_thread() 

815 self._check_callback(callback, 'call_at') 

816 timer = events.TimerHandle(when, callback, args, self, context) 

817 if timer._source_traceback: 

818 del timer._source_traceback[-1] 

819 heapq.heappush(self._scheduled, timer) 

820 timer._scheduled = True 

821 return timer 

822 

823 def call_soon(self, callback, *args, context=None): 

824 """Arrange for a callback to be called as soon as possible. 

825 

826 This operates as a FIFO queue: callbacks are called in the 

827 order in which they are registered. Each callback will be 

828 called exactly once. 

829 

830 Any positional arguments after the callback will be passed to 

831 the callback when it is called. 

832 """ 

833 self._check_closed() 

834 if self._debug: 

835 self._check_thread() 

836 self._check_callback(callback, 'call_soon') 

837 handle = self._call_soon(callback, args, context) 

838 if handle._source_traceback: 

839 del handle._source_traceback[-1] 

840 return handle 

841 

842 def _check_callback(self, callback, method): 

843 if (coroutines.iscoroutine(callback) or 

844 inspect.iscoroutinefunction(callback)): 

845 raise TypeError( 

846 f"coroutines cannot be used with {method}()") 

847 if not callable(callback): 

848 raise TypeError( 

849 f'a callable object was expected by {method}(), ' 

850 f'got {callback!r}') 

851 

852 def _call_soon(self, callback, args, context): 

853 handle = events.Handle(callback, args, self, context) 

854 if handle._source_traceback: 

855 del handle._source_traceback[-1] 

856 self._ready.append(handle) 

857 return handle 

858 

859 def _check_thread(self): 

860 """Check that the current thread is the thread running the event loop. 

861 

862 Non-thread-safe methods of this class make this assumption and will 

863 likely behave incorrectly when the assumption is violated. 

864 

865 Should only be called when (self._debug == True). The caller is 

866 responsible for checking this condition for performance reasons. 

867 """ 

868 if self._thread_id is None: 

869 return 

870 thread_id = threading.get_ident() 

871 if thread_id != self._thread_id: 

872 raise RuntimeError( 

873 "Non-thread-safe operation invoked on an event loop other " 

874 "than the current one") 

875 

876 def call_soon_threadsafe(self, callback, *args, context=None): 

877 """Like call_soon(), but thread-safe.""" 

878 self._check_closed() 

879 if self._debug: 

880 self._check_callback(callback, 'call_soon_threadsafe') 

881 handle = events._ThreadSafeHandle(callback, args, self, context) 

882 self._ready.append(handle) 

883 if handle._source_traceback: 

884 del handle._source_traceback[-1] 

885 if handle._source_traceback: 

886 del handle._source_traceback[-1] 

887 self._write_to_self() 

888 return handle 

889 

890 def run_in_executor(self, executor, func, *args): 

891 self._check_closed() 

892 if self._debug: 

893 self._check_callback(func, 'run_in_executor') 

894 if executor is None: 

895 executor = self._default_executor 

896 # Only check when the default executor is being used 

897 self._check_default_executor() 

898 if executor is None: 

899 executor = concurrent.futures.ThreadPoolExecutor( 

900 thread_name_prefix='asyncio' 

901 ) 

902 self._default_executor = executor 

903 return futures.wrap_future( 

904 executor.submit(func, *args), loop=self) 

905 

906 def set_default_executor(self, executor): 

907 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): 

908 raise TypeError('executor must be ThreadPoolExecutor instance') 

909 self._default_executor = executor 

910 

911 def _getaddrinfo_debug(self, host, port, family, type, proto, flags): 

912 msg = [f"{host}:{port!r}"] 

913 if family: 

914 msg.append(f'family={family!r}') 

915 if type: 

916 msg.append(f'type={type!r}') 

917 if proto: 

918 msg.append(f'proto={proto!r}') 

919 if flags: 

920 msg.append(f'flags={flags!r}') 

921 msg = ', '.join(msg) 

922 logger.debug('Get address info %s', msg) 

923 

924 t0 = self.time() 

925 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags) 

926 dt = self.time() - t0 

927 

928 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}' 

929 if dt >= self.slow_callback_duration: 

930 logger.info(msg) 

931 else: 

932 logger.debug(msg) 

933 return addrinfo 

934 

935 async def getaddrinfo(self, host, port, *, 

936 family=0, type=0, proto=0, flags=0): 

937 if self._debug: 937 ↛ 938line 937 didn't jump to line 938 because the condition on line 937 was never true

938 getaddr_func = self._getaddrinfo_debug 

939 else: 

940 getaddr_func = socket.getaddrinfo 

941 

942 return await self.run_in_executor( 

943 None, getaddr_func, host, port, family, type, proto, flags) 

944 

945 async def getnameinfo(self, sockaddr, flags=0): 

946 return await self.run_in_executor( 

947 None, socket.getnameinfo, sockaddr, flags) 

948 

949 async def sock_sendfile(self, sock, file, offset=0, count=None, 

950 *, fallback=True): 

951 if self._debug and sock.gettimeout() != 0: 

952 raise ValueError("the socket must be non-blocking") 

953 _check_ssl_socket(sock) 

954 self._check_sendfile_params(sock, file, offset, count) 

955 try: 

956 return await self._sock_sendfile_native(sock, file, 

957 offset, count) 

958 except exceptions.SendfileNotAvailableError: 

959 if not fallback: 

960 raise 

961 return await self._sock_sendfile_fallback(sock, file, 

962 offset, count) 

963 

964 async def _sock_sendfile_native(self, sock, file, offset, count): 

965 # NB: sendfile syscall is not supported for SSL sockets and 

966 # non-mmap files even if sendfile is supported by OS 

967 raise exceptions.SendfileNotAvailableError( 

968 f"syscall sendfile is not available for socket {sock!r} " 

969 f"and file {file!r} combination") 

970 

971 async def _sock_sendfile_fallback(self, sock, file, offset, count): 

972 if hasattr(file, 'seek'): 972 ↛ 974line 972 didn't jump to line 974 because the condition on line 972 was always true

973 file.seek(offset) 

974 blocksize = ( 

975 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE) 

976 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE 

977 ) 

978 buf = bytearray(blocksize) 

979 total_sent = 0 

980 try: 

981 while True: 

982 if count: 

983 blocksize = min(count - total_sent, blocksize) 

984 if blocksize <= 0: 

985 break 

986 view = memoryview(buf)[:blocksize] 

987 read = await self.run_in_executor(None, file.readinto, view) 

988 if not read: 

989 break # EOF 

990 await self.sock_sendall(sock, view[:read]) 

991 total_sent += read 

992 return total_sent 

993 finally: 

994 if total_sent > 0 and hasattr(file, 'seek'): 

995 file.seek(offset + total_sent) 

996 

997 def _check_sendfile_params(self, sock, file, offset, count): 

998 if 'b' not in getattr(file, 'mode', 'b'): 

999 raise ValueError("file should be opened in binary mode") 

1000 if not sock.type == socket.SOCK_STREAM: 

1001 raise ValueError("only SOCK_STREAM type sockets are supported") 

1002 if count is not None: 

1003 if not isinstance(count, int): 

1004 raise TypeError( 

1005 "count must be a positive integer (got {!r})".format(count)) 

1006 if count <= 0: 

1007 raise ValueError( 

1008 "count must be a positive integer (got {!r})".format(count)) 

1009 if not isinstance(offset, int): 

1010 raise TypeError( 

1011 "offset must be a non-negative integer (got {!r})".format( 

1012 offset)) 

1013 if offset < 0: 

1014 raise ValueError( 

1015 "offset must be a non-negative integer (got {!r})".format( 

1016 offset)) 

1017 

1018 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None): 

1019 """Create, bind and connect one socket.""" 

1020 my_exceptions = [] 

1021 exceptions.append(my_exceptions) 

1022 family, type_, proto, _, address = addr_info 

1023 sock = None 

1024 try: 

1025 try: 

1026 sock = socket.socket(family=family, type=type_, proto=proto) 

1027 sock.setblocking(False) 

1028 if local_addr_infos is not None: 

1029 for lfamily, _, _, _, laddr in local_addr_infos: 

1030 # skip local addresses of different family 

1031 if lfamily != family: 

1032 continue 

1033 try: 

1034 sock.bind(laddr) 

1035 break 

1036 except OSError as exc: 

1037 msg = ( 

1038 f'error while attempting to bind on ' 

1039 f'address {laddr!r}: {str(exc).lower()}' 

1040 ) 

1041 exc = OSError(exc.errno, msg) 

1042 my_exceptions.append(exc) 

1043 else: # all bind attempts failed 

1044 if my_exceptions: 

1045 raise my_exceptions.pop() 

1046 else: 

1047 raise OSError(f"no matching local address with {family=} found") 

1048 await self.sock_connect(sock, address) 

1049 return sock 

1050 except OSError as exc: 

1051 my_exceptions.append(exc) 

1052 raise 

1053 except: 

1054 if sock is not None: 

1055 try: 

1056 sock.close() 

1057 except OSError: 

1058 # An error when closing a newly created socket is 

1059 # not important, but it can overwrite more important 

1060 # non-OSError error. So ignore it. 

1061 pass 

1062 raise 

1063 finally: 

1064 exceptions = my_exceptions = None 

1065 

1066 async def create_connection( 

1067 self, protocol_factory, host=None, port=None, 

1068 *, ssl=None, family=0, 

1069 proto=0, flags=0, sock=None, 

1070 local_addr=None, server_hostname=None, 

1071 ssl_handshake_timeout=None, 

1072 ssl_shutdown_timeout=None, 

1073 happy_eyeballs_delay=None, interleave=None, 

1074 all_errors=False): 

1075 """Connect to a TCP server. 

1076 

1077 Create a streaming transport connection to a given internet host and 

1078 port: socket family AF_INET or socket.AF_INET6 depending on host (or 

1079 family if specified), socket type SOCK_STREAM. protocol_factory must 

1080 be a callable returning a protocol instance. 

1081 

1082 This method is a coroutine which will try to establish the 

1083 connection in the background. When successful, the coroutine 

1084 returns a (transport, protocol) pair. 

1085 """ 

1086 if server_hostname is not None and not ssl: 

1087 raise ValueError('server_hostname is only meaningful with ssl') 

1088 

1089 if server_hostname is None and ssl: 

1090 # Use host as default for server_hostname. It is an error 

1091 # if host is empty or not set, e.g. when an 

1092 # already-connected socket was passed or when only a port 

1093 # is given. To avoid this error, you can pass 

1094 # server_hostname='' -- this will bypass the hostname 

1095 # check. (This also means that if host is a numeric 

1096 # IP/IPv6 address, we will attempt to verify that exact 

1097 # address; this will probably fail, but it is possible to 

1098 # create a certificate for a specific IP address, so we 

1099 # don't judge it here.) 

1100 if not host: 

1101 raise ValueError('You must set server_hostname ' 

1102 'when using ssl without a host') 

1103 server_hostname = host 

1104 

1105 if ssl_handshake_timeout is not None and not ssl: 

1106 raise ValueError( 

1107 'ssl_handshake_timeout is only meaningful with ssl') 

1108 

1109 if ssl_shutdown_timeout is not None and not ssl: 1109 ↛ 1110line 1109 didn't jump to line 1110 because the condition on line 1109 was never true

1110 raise ValueError( 

1111 'ssl_shutdown_timeout is only meaningful with ssl') 

1112 

1113 if sock is not None: 

1114 _check_ssl_socket(sock) 

1115 

1116 if happy_eyeballs_delay is not None and interleave is None: 

1117 # If using happy eyeballs, default to interleave addresses by family 

1118 interleave = 1 

1119 

1120 if host is not None or port is not None: 

1121 if sock is not None: 

1122 raise ValueError( 

1123 'host/port and sock can not be specified at the same time') 

1124 

1125 infos = await self._ensure_resolved( 

1126 (host, port), family=family, 

1127 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) 

1128 if not infos: 

1129 raise OSError('getaddrinfo() returned empty list') 

1130 

1131 if local_addr is not None: 

1132 laddr_infos = await self._ensure_resolved( 

1133 local_addr, family=family, 

1134 type=socket.SOCK_STREAM, proto=proto, 

1135 flags=flags, loop=self) 

1136 if not laddr_infos: 

1137 raise OSError('getaddrinfo() returned empty list') 

1138 else: 

1139 laddr_infos = None 

1140 

1141 if interleave: 

1142 infos = _interleave_addrinfos(infos, interleave) 

1143 

1144 exceptions = [] 

1145 if happy_eyeballs_delay is None: 

1146 # not using happy eyeballs 

1147 for addrinfo in infos: 

1148 try: 

1149 sock = await self._connect_sock( 

1150 exceptions, addrinfo, laddr_infos) 

1151 break 

1152 except OSError: 

1153 continue 

1154 else: # using happy eyeballs 

1155 sock = (await staggered.staggered_race( 

1156 ( 

1157 # can't use functools.partial as it keeps a reference 

1158 # to exceptions 

1159 lambda addrinfo=addrinfo: self._connect_sock( 

1160 exceptions, addrinfo, laddr_infos 

1161 ) 

1162 for addrinfo in infos 

1163 ), 

1164 happy_eyeballs_delay, 

1165 loop=self, 

1166 ))[0] # can't use sock, _, _ as it keeks a reference to exceptions 

1167 

1168 if sock is None: 

1169 exceptions = [exc for sub in exceptions for exc in sub] 

1170 try: 

1171 if all_errors: 

1172 raise ExceptionGroup("create_connection failed", exceptions) 

1173 if len(exceptions) == 1: 

1174 raise exceptions[0] 

1175 elif exceptions: 

1176 # If they all have the same str(), raise one. 

1177 model = str(exceptions[0]) 

1178 if all(str(exc) == model for exc in exceptions): 

1179 raise exceptions[0] 

1180 # Raise a combined exception so the user can see all 

1181 # the various error messages. 

1182 raise OSError('Multiple exceptions: {}'.format( 

1183 ', '.join(str(exc) for exc in exceptions))) 

1184 else: 

1185 # No exceptions were collected, raise a timeout error 

1186 raise TimeoutError('create_connection failed') 

1187 finally: 

1188 exceptions = None 

1189 

1190 else: 

1191 if sock is None: 

1192 raise ValueError( 

1193 'host and port was not specified and no sock specified') 

1194 if sock.type != socket.SOCK_STREAM: 

1195 # We allow AF_INET, AF_INET6, AF_UNIX as long as they 

1196 # are SOCK_STREAM. 

1197 # We support passing AF_UNIX sockets even though we have 

1198 # a dedicated API for that: create_unix_connection. 

1199 # Disallowing AF_UNIX in this method, breaks backwards 

1200 # compatibility. 

1201 raise ValueError( 

1202 f'A Stream Socket was expected, got {sock!r}') 

1203 

1204 transport, protocol = await self._create_connection_transport( 

1205 sock, protocol_factory, ssl, server_hostname, 

1206 ssl_handshake_timeout=ssl_handshake_timeout, 

1207 ssl_shutdown_timeout=ssl_shutdown_timeout) 

1208 if self._debug: 

1209 # Get the socket from the transport because SSL transport closes 

1210 # the old socket and creates a new SSL socket 

1211 sock = transport.get_extra_info('socket') 

1212 logger.debug("%r connected to %s:%r: (%r, %r)", 

1213 sock, host, port, transport, protocol) 

1214 return transport, protocol 

1215 

1216 async def _create_connection_transport( 

1217 self, sock, protocol_factory, ssl, 

1218 server_hostname, server_side=False, 

1219 ssl_handshake_timeout=None, 

1220 ssl_shutdown_timeout=None, context=None): 

1221 

1222 sock.setblocking(False) 

1223 context = context if context is not None else contextvars.copy_context() 

1224 

1225 protocol = protocol_factory() 

1226 waiter = self.create_future() 

1227 if ssl: 

1228 sslcontext = None if isinstance(ssl, bool) else ssl 

1229 transport = self._make_ssl_transport( 

1230 sock, protocol, sslcontext, waiter, 

1231 server_side=server_side, server_hostname=server_hostname, 

1232 ssl_handshake_timeout=ssl_handshake_timeout, 

1233 ssl_shutdown_timeout=ssl_shutdown_timeout, 

1234 context=context) 

1235 else: 

1236 transport = self._make_socket_transport(sock, protocol, waiter, context=context) 

1237 

1238 try: 

1239 await waiter 

1240 except: 

1241 transport.close() 

1242 raise 

1243 

1244 return transport, protocol 

1245 

1246 async def sendfile(self, transport, file, offset=0, count=None, 

1247 *, fallback=True): 

1248 """Send a file to transport. 

1249 

1250 Return the total number of bytes which were sent. 

1251 

1252 The method uses high-performance os.sendfile if available. 

1253 

1254 file must be a regular file object opened in binary mode. 

1255 

1256 offset tells from where to start reading the file. If specified, 

1257 count is the total number of bytes to transmit as opposed to 

1258 sending the file until EOF is reached. File position is updated on 

1259 return or also in case of error in which case file.tell() 

1260 can be used to figure out the number of bytes 

1261 which were sent. 

1262 

1263 fallback set to True makes asyncio to manually read and send 

1264 the file when the platform does not support the sendfile syscall 

1265 (e.g. Windows or SSL socket on Unix). 

1266 

1267 Raise SendfileNotAvailableError if the system does not support 

1268 sendfile syscall and fallback is False. 

1269 """ 

1270 if transport.is_closing(): 

1271 raise RuntimeError("Transport is closing") 

1272 mode = getattr(transport, '_sendfile_compatible', 

1273 constants._SendfileMode.UNSUPPORTED) 

1274 if mode is constants._SendfileMode.UNSUPPORTED: 

1275 raise RuntimeError( 

1276 f"sendfile is not supported for transport {transport!r}") 

1277 if mode is constants._SendfileMode.TRY_NATIVE: 

1278 try: 

1279 return await self._sendfile_native(transport, file, 

1280 offset, count) 

1281 except exceptions.SendfileNotAvailableError: 

1282 if not fallback: 

1283 raise 

1284 

1285 if not fallback: 

1286 raise RuntimeError( 

1287 f"fallback is disabled and native sendfile is not " 

1288 f"supported for transport {transport!r}") 

1289 return await self._sendfile_fallback(transport, file, 

1290 offset, count) 

1291 

1292 async def _sendfile_native(self, transp, file, offset, count): 

1293 raise exceptions.SendfileNotAvailableError( 

1294 "sendfile syscall is not supported") 

1295 

1296 async def _sendfile_fallback(self, transp, file, offset, count): 

1297 if hasattr(file, 'seek'): 1297 ↛ 1299line 1297 didn't jump to line 1299 because the condition on line 1297 was always true

1298 file.seek(offset) 

1299 blocksize = min(count, 16384) if count else 16384 

1300 buf = bytearray(blocksize) 

1301 total_sent = 0 

1302 proto = _SendfileFallbackProtocol(transp) 

1303 try: 

1304 while True: 

1305 if count: 

1306 blocksize = min(count - total_sent, blocksize) 

1307 if blocksize <= 0: 

1308 return total_sent 

1309 view = memoryview(buf)[:blocksize] 

1310 read = await self.run_in_executor(None, file.readinto, view) 

1311 if not read: 

1312 return total_sent # EOF 

1313 transp.write(view[:read]) 

1314 await proto.drain() 

1315 total_sent += read 

1316 finally: 

1317 if total_sent > 0 and hasattr(file, 'seek'): 

1318 file.seek(offset + total_sent) 

1319 await proto.restore() 

1320 

1321 async def start_tls(self, transport, protocol, sslcontext, *, 

1322 server_side=False, 

1323 server_hostname=None, 

1324 ssl_handshake_timeout=None, 

1325 ssl_shutdown_timeout=None): 

1326 """Upgrade transport to TLS. 

1327 

1328 Return a new transport that *protocol* should start using 

1329 immediately. 

1330 """ 

1331 if ssl is None: 1331 ↛ 1332line 1331 didn't jump to line 1332 because the condition on line 1331 was never true

1332 raise RuntimeError('Python ssl module is not available') 

1333 

1334 if not isinstance(sslcontext, ssl.SSLContext): 

1335 raise TypeError( 

1336 f'sslcontext is expected to be an instance of ssl.SSLContext, ' 

1337 f'got {sslcontext!r}') 

1338 

1339 if not getattr(transport, '_start_tls_compatible', False): 

1340 raise TypeError( 

1341 f'transport {transport!r} is not supported by start_tls()') 

1342 

1343 waiter = self.create_future() 

1344 ssl_protocol = sslproto.SSLProtocol( 

1345 self, protocol, sslcontext, waiter, 

1346 server_side, server_hostname, 

1347 ssl_handshake_timeout=ssl_handshake_timeout, 

1348 ssl_shutdown_timeout=ssl_shutdown_timeout, 

1349 call_connection_made=False) 

1350 

1351 # Pause early so that "ssl_protocol.data_received()" doesn't 

1352 # have a chance to get called before "ssl_protocol.connection_made()". 

1353 transport.pause_reading() 

1354 

1355 # gh-142352: move buffered StreamReader data to SSLProtocol 

1356 if server_side: 

1357 from .streams import StreamReaderProtocol 

1358 if isinstance(protocol, StreamReaderProtocol): 

1359 stream_reader = getattr(protocol, '_stream_reader', None) 

1360 if stream_reader is not None: 1360 ↛ 1366line 1360 didn't jump to line 1366 because the condition on line 1360 was always true

1361 buffer = stream_reader._buffer 

1362 if buffer: 

1363 ssl_protocol._incoming.write(buffer) 

1364 buffer.clear() 

1365 

1366 transport.set_protocol(ssl_protocol) 

1367 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport) 

1368 resume_cb = self.call_soon(transport.resume_reading) 

1369 

1370 try: 

1371 await waiter 

1372 except BaseException: 

1373 transport.close() 

1374 conmade_cb.cancel() 

1375 resume_cb.cancel() 

1376 raise 

1377 

1378 return ssl_protocol._app_transport 

1379 

1380 async def create_datagram_endpoint(self, protocol_factory, 

1381 local_addr=None, remote_addr=None, *, 

1382 family=0, proto=0, flags=0, 

1383 reuse_port=None, 

1384 allow_broadcast=None, sock=None): 

1385 """Create datagram connection.""" 

1386 if sock is not None: 

1387 if sock.type == socket.SOCK_STREAM: 

1388 raise ValueError( 

1389 f'A datagram socket was expected, got {sock!r}') 

1390 if (local_addr or remote_addr or 

1391 family or proto or flags or 

1392 reuse_port or allow_broadcast): 

1393 # show the problematic kwargs in exception msg 

1394 opts = dict(local_addr=local_addr, remote_addr=remote_addr, 

1395 family=family, proto=proto, flags=flags, 

1396 reuse_port=reuse_port, 

1397 allow_broadcast=allow_broadcast) 

1398 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v) 

1399 raise ValueError( 

1400 f'socket modifier keyword arguments can not be used ' 

1401 f'when sock is specified. ({problems})') 

1402 sock.setblocking(False) 

1403 r_addr = None 

1404 else: 

1405 if not (local_addr or remote_addr): 

1406 if family == 0: 

1407 raise ValueError('unexpected address family') 

1408 addr_pairs_info = (((family, proto), (None, None)),) 

1409 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: 

1410 for addr in (local_addr, remote_addr): 

1411 if addr is not None and not isinstance(addr, str): 1411 ↛ 1412line 1411 didn't jump to line 1412 because the condition on line 1411 was never true

1412 raise TypeError('string is expected') 

1413 

1414 if local_addr and local_addr[0] not in (0, '\x00'): 1414 ↛ 1426line 1414 didn't jump to line 1426 because the condition on line 1414 was always true

1415 try: 

1416 if stat.S_ISSOCK(os.stat(local_addr).st_mode): 1416 ↛ 1426line 1416 didn't jump to line 1426 because the condition on line 1416 was always true

1417 os.remove(local_addr) 

1418 except FileNotFoundError: 

1419 pass 

1420 except OSError as err: 

1421 # Directory may have permissions only to create socket. 

1422 logger.error('Unable to check or remove stale UNIX ' 

1423 'socket %r: %r', 

1424 local_addr, err) 

1425 

1426 addr_pairs_info = (((family, proto), 

1427 (local_addr, remote_addr)), ) 

1428 else: 

1429 # join address by (family, protocol) 

1430 addr_infos = {} # Using order preserving dict 

1431 for idx, addr in ((0, local_addr), (1, remote_addr)): 

1432 if addr is not None: 

1433 if not (isinstance(addr, tuple) and len(addr) == 2): 

1434 raise TypeError('2-tuple is expected') 

1435 

1436 infos = await self._ensure_resolved( 

1437 addr, family=family, type=socket.SOCK_DGRAM, 

1438 proto=proto, flags=flags, loop=self) 

1439 if not infos: 

1440 raise OSError('getaddrinfo() returned empty list') 

1441 

1442 for fam, _, pro, _, address in infos: 

1443 key = (fam, pro) 

1444 if key not in addr_infos: 1444 ↛ 1446line 1444 didn't jump to line 1446 because the condition on line 1444 was always true

1445 addr_infos[key] = [None, None] 

1446 addr_infos[key][idx] = address 

1447 

1448 # each addr has to have info for each (family, proto) pair 

1449 addr_pairs_info = [ 

1450 (key, addr_pair) for key, addr_pair in addr_infos.items() 

1451 if not ((local_addr and addr_pair[0] is None) or 

1452 (remote_addr and addr_pair[1] is None))] 

1453 

1454 if not addr_pairs_info: 

1455 raise ValueError('can not get address information') 

1456 

1457 exceptions = [] 

1458 

1459 for ((family, proto), 

1460 (local_address, remote_address)) in addr_pairs_info: 

1461 sock = None 

1462 r_addr = None 

1463 try: 

1464 sock = socket.socket( 

1465 family=family, type=socket.SOCK_DGRAM, proto=proto) 

1466 if reuse_port: 

1467 _set_reuseport(sock) 

1468 if allow_broadcast: 

1469 sock.setsockopt( 

1470 socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 

1471 sock.setblocking(False) 

1472 

1473 if local_addr: 

1474 sock.bind(local_address) 

1475 if remote_addr: 

1476 if not allow_broadcast: 

1477 await self.sock_connect(sock, remote_address) 

1478 r_addr = remote_address 

1479 except OSError as exc: 

1480 if sock is not None: 

1481 sock.close() 

1482 exceptions.append(exc) 

1483 except: 

1484 if sock is not None: 1484 ↛ 1486line 1484 didn't jump to line 1486 because the condition on line 1484 was always true

1485 sock.close() 

1486 raise 

1487 else: 

1488 break 

1489 else: 

1490 raise exceptions[0] 

1491 

1492 protocol = protocol_factory() 

1493 waiter = self.create_future() 

1494 transport = self._make_datagram_transport( 

1495 sock, protocol, r_addr, waiter) 

1496 if self._debug: 1496 ↛ 1497line 1496 didn't jump to line 1497 because the condition on line 1496 was never true

1497 if local_addr: 

1498 logger.info("Datagram endpoint local_addr=%r remote_addr=%r " 

1499 "created: (%r, %r)", 

1500 local_addr, remote_addr, transport, protocol) 

1501 else: 

1502 logger.debug("Datagram endpoint remote_addr=%r created: " 

1503 "(%r, %r)", 

1504 remote_addr, transport, protocol) 

1505 

1506 try: 

1507 await waiter 

1508 except: 

1509 transport.close() 

1510 raise 

1511 

1512 return transport, protocol 

1513 

1514 async def _ensure_resolved(self, address, *, 

1515 family=0, type=socket.SOCK_STREAM, 

1516 proto=0, flags=0, loop): 

1517 host, port = address[:2] 

1518 info = _ipaddr_info(host, port, family, type, proto, *address[2:]) 

1519 if info is not None: 

1520 # "host" is already a resolved IP. 

1521 return [info] 

1522 else: 

1523 return await loop.getaddrinfo(host, port, family=family, type=type, 

1524 proto=proto, flags=flags) 

1525 

1526 async def _create_server_getaddrinfo(self, host, port, family, flags): 

1527 infos = await self._ensure_resolved((host, port), family=family, 

1528 type=socket.SOCK_STREAM, 

1529 flags=flags, loop=self) 

1530 if not infos: 

1531 raise OSError(f'getaddrinfo({host!r}) returned empty list') 

1532 return infos 

1533 

1534 async def create_server( 

1535 self, protocol_factory, host=None, port=None, 

1536 *, 

1537 family=socket.AF_UNSPEC, 

1538 flags=socket.AI_PASSIVE, 

1539 sock=None, 

1540 backlog=100, 

1541 ssl=None, 

1542 reuse_address=None, 

1543 reuse_port=None, 

1544 keep_alive=None, 

1545 ssl_handshake_timeout=None, 

1546 ssl_shutdown_timeout=None, 

1547 start_serving=True): 

1548 """Create a TCP server. 

1549 

1550 The host parameter can be a string, in that case the TCP server is 

1551 bound to host and port. 

1552 

1553 The host parameter can also be a sequence of strings and in that 

1554 case the TCP server is bound to all hosts of the sequence. If 

1555 a host appears multiple times (possibly indirectly e.g. when 

1556 hostnames resolve to the same IP address), the server is only bound 

1557 once to that host. 

1558 

1559 Return a Server object which can be used to stop the service. 

1560 

1561 This method is a coroutine. 

1562 """ 

1563 if isinstance(ssl, bool): 1563 ↛ 1564line 1563 didn't jump to line 1564 because the condition on line 1563 was never true

1564 raise TypeError('ssl argument must be an SSLContext or None') 

1565 

1566 if ssl_handshake_timeout is not None and ssl is None: 

1567 raise ValueError( 

1568 'ssl_handshake_timeout is only meaningful with ssl') 

1569 

1570 if ssl_shutdown_timeout is not None and ssl is None: 1570 ↛ 1571line 1570 didn't jump to line 1571 because the condition on line 1570 was never true

1571 raise ValueError( 

1572 'ssl_shutdown_timeout is only meaningful with ssl') 

1573 

1574 if sock is not None: 

1575 _check_ssl_socket(sock) 

1576 

1577 if host is not None or port is not None: 

1578 if sock is not None: 

1579 raise ValueError( 

1580 'host/port and sock can not be specified at the same time') 

1581 

1582 if reuse_address is None: 1582 ↛ 1584line 1582 didn't jump to line 1584 because the condition on line 1582 was always true

1583 reuse_address = os.name == "posix" and sys.platform != "cygwin" 

1584 sockets = [] 

1585 if host == '': 

1586 hosts = [None] 

1587 elif (isinstance(host, str) or 

1588 not isinstance(host, collections.abc.Iterable)): 

1589 hosts = [host] 

1590 else: 

1591 hosts = host 

1592 

1593 fs = [self._create_server_getaddrinfo(host, port, family=family, 

1594 flags=flags) 

1595 for host in hosts] 

1596 infos = await tasks.gather(*fs) 

1597 infos = set(itertools.chain.from_iterable(infos)) 

1598 

1599 completed = False 

1600 try: 

1601 for res in infos: 

1602 af, socktype, proto, canonname, sa = res 

1603 try: 

1604 sock = socket.socket(af, socktype, proto) 

1605 except socket.error: 

1606 # Assume it's a bad family/type/protocol combination. 

1607 if self._debug: 

1608 logger.warning('create_server() failed to create ' 

1609 'socket.socket(%r, %r, %r)', 

1610 af, socktype, proto, exc_info=True) 

1611 continue 

1612 sockets.append(sock) 

1613 if reuse_address: 1613 ↛ 1618line 1613 didn't jump to line 1618 because the condition on line 1613 was always true

1614 sock.setsockopt( 

1615 socket.SOL_SOCKET, socket.SO_REUSEADDR, True) 

1616 # Since Linux 6.12.9, SO_REUSEPORT is not allowed 

1617 # on other address families than AF_INET/AF_INET6. 

1618 if reuse_port and af in (socket.AF_INET, socket.AF_INET6): 

1619 _set_reuseport(sock) 

1620 if keep_alive: 1620 ↛ 1621line 1620 didn't jump to line 1621 because the condition on line 1620 was never true

1621 sock.setsockopt( 

1622 socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) 

1623 # Disable IPv4/IPv6 dual stack support (enabled by 

1624 # default on Linux) which makes a single socket 

1625 # listen on both address families. 

1626 if (_HAS_IPv6 and 

1627 af == socket.AF_INET6 and 

1628 hasattr(socket, 'IPPROTO_IPV6')): 

1629 sock.setsockopt(socket.IPPROTO_IPV6, 

1630 socket.IPV6_V6ONLY, 

1631 True) 

1632 try: 

1633 sock.bind(sa) 

1634 except OSError as err: 

1635 msg = ('error while attempting ' 

1636 'to bind on address %r: %s' 

1637 % (sa, str(err).lower())) 

1638 if err.errno == errno.EADDRNOTAVAIL: 1638 ↛ 1640line 1638 didn't jump to line 1640 because the condition on line 1638 was never true

1639 # Assume the family is not enabled (bpo-30945) 

1640 sockets.pop() 

1641 sock.close() 

1642 if self._debug: 

1643 logger.warning(msg) 

1644 continue 

1645 raise OSError(err.errno, msg) from None 

1646 

1647 if not sockets: 1647 ↛ 1648line 1647 didn't jump to line 1648 because the condition on line 1647 was never true

1648 raise OSError('could not bind on any address out of %r' 

1649 % ([info[4] for info in infos],)) 

1650 

1651 completed = True 

1652 finally: 

1653 if not completed: 

1654 for sock in sockets: 1654 ↛ 1663line 1654 didn't jump to line 1663 because the loop on line 1654 didn't complete

1655 sock.close() 

1656 else: 

1657 if sock is None: 

1658 raise ValueError('Neither host/port nor sock were specified') 

1659 if sock.type != socket.SOCK_STREAM: 

1660 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 

1661 sockets = [sock] 

1662 

1663 for sock in sockets: 

1664 sock.setblocking(False) 

1665 

1666 server = Server(self, sockets, protocol_factory, 

1667 ssl, backlog, ssl_handshake_timeout, 

1668 ssl_shutdown_timeout) 

1669 if start_serving: 

1670 server._start_serving() 

1671 # Skip one loop iteration so that all 'loop.add_reader' 

1672 # go through. 

1673 await tasks.sleep(0) 

1674 

1675 if self._debug: 

1676 logger.info("%r is serving", server) 

1677 return server 

1678 

1679 async def connect_accepted_socket( 

1680 self, protocol_factory, sock, 

1681 *, ssl=None, 

1682 ssl_handshake_timeout=None, 

1683 ssl_shutdown_timeout=None): 

1684 if sock.type != socket.SOCK_STREAM: 1684 ↛ 1685line 1684 didn't jump to line 1685 because the condition on line 1684 was never true

1685 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 

1686 

1687 if ssl_handshake_timeout is not None and not ssl: 

1688 raise ValueError( 

1689 'ssl_handshake_timeout is only meaningful with ssl') 

1690 

1691 if ssl_shutdown_timeout is not None and not ssl: 1691 ↛ 1692line 1691 didn't jump to line 1692 because the condition on line 1691 was never true

1692 raise ValueError( 

1693 'ssl_shutdown_timeout is only meaningful with ssl') 

1694 

1695 _check_ssl_socket(sock) 

1696 

1697 transport, protocol = await self._create_connection_transport( 

1698 sock, protocol_factory, ssl, '', server_side=True, 

1699 ssl_handshake_timeout=ssl_handshake_timeout, 

1700 ssl_shutdown_timeout=ssl_shutdown_timeout) 

1701 if self._debug: 1701 ↛ 1704line 1701 didn't jump to line 1704 because the condition on line 1701 was never true

1702 # Get the socket from the transport because SSL transport closes 

1703 # the old socket and creates a new SSL socket 

1704 sock = transport.get_extra_info('socket') 

1705 logger.debug("%r handled: (%r, %r)", sock, transport, protocol) 

1706 return transport, protocol 

1707 

1708 async def connect_read_pipe(self, protocol_factory, pipe): 

1709 protocol = protocol_factory() 

1710 waiter = self.create_future() 

1711 transport = self._make_read_pipe_transport(pipe, protocol, waiter) 

1712 

1713 try: 

1714 await waiter 

1715 except: 

1716 transport.close() 

1717 raise 

1718 

1719 if self._debug: 1719 ↛ 1720line 1719 didn't jump to line 1720 because the condition on line 1719 was never true

1720 logger.debug('Read pipe %r connected: (%r, %r)', 

1721 pipe.fileno(), transport, protocol) 

1722 return transport, protocol 

1723 

1724 async def connect_write_pipe(self, protocol_factory, pipe): 

1725 protocol = protocol_factory() 

1726 waiter = self.create_future() 

1727 transport = self._make_write_pipe_transport(pipe, protocol, waiter) 

1728 

1729 try: 

1730 await waiter 

1731 except: 

1732 transport.close() 

1733 raise 

1734 

1735 if self._debug: 1735 ↛ 1736line 1735 didn't jump to line 1736 because the condition on line 1735 was never true

1736 logger.debug('Write pipe %r connected: (%r, %r)', 

1737 pipe.fileno(), transport, protocol) 

1738 return transport, protocol 

1739 

1740 def _log_subprocess(self, msg, stdin, stdout, stderr): 

1741 info = [msg] 

1742 if stdin is not None: 

1743 info.append(f'stdin={_format_pipe(stdin)}') 

1744 if stdout is not None and stderr == subprocess.STDOUT: 

1745 info.append(f'stdout=stderr={_format_pipe(stdout)}') 

1746 else: 

1747 if stdout is not None: 

1748 info.append(f'stdout={_format_pipe(stdout)}') 

1749 if stderr is not None: 

1750 info.append(f'stderr={_format_pipe(stderr)}') 

1751 logger.debug(' '.join(info)) 

1752 

1753 async def subprocess_shell(self, protocol_factory, cmd, *, 

1754 stdin=subprocess.PIPE, 

1755 stdout=subprocess.PIPE, 

1756 stderr=subprocess.PIPE, 

1757 universal_newlines=False, 

1758 shell=True, bufsize=0, 

1759 encoding=None, errors=None, text=None, 

1760 **kwargs): 

1761 if not isinstance(cmd, (bytes, str)): 

1762 raise ValueError("cmd must be a string") 

1763 if universal_newlines: 

1764 raise ValueError("universal_newlines must be False") 

1765 if not shell: 

1766 raise ValueError("shell must be True") 

1767 if bufsize != 0: 

1768 raise ValueError("bufsize must be 0") 

1769 if text: 

1770 raise ValueError("text must be False") 

1771 if encoding is not None: 

1772 raise ValueError("encoding must be None") 

1773 if errors is not None: 

1774 raise ValueError("errors must be None") 

1775 

1776 protocol = protocol_factory() 

1777 debug_log = None 

1778 if self._debug: 1778 ↛ 1781line 1778 didn't jump to line 1781 because the condition on line 1778 was never true

1779 # don't log parameters: they may contain sensitive information 

1780 # (password) and may be too long 

1781 debug_log = 'run shell command %r' % cmd 

1782 self._log_subprocess(debug_log, stdin, stdout, stderr) 

1783 transport = await self._make_subprocess_transport( 

1784 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) 

1785 if self._debug and debug_log is not None: 1785 ↛ 1786line 1785 didn't jump to line 1786 because the condition on line 1785 was never true

1786 logger.info('%s: %r', debug_log, transport) 

1787 return transport, protocol 

1788 

1789 async def subprocess_exec(self, protocol_factory, program, *args, 

1790 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 

1791 stderr=subprocess.PIPE, universal_newlines=False, 

1792 shell=False, bufsize=0, 

1793 encoding=None, errors=None, text=None, 

1794 **kwargs): 

1795 if universal_newlines: 

1796 raise ValueError("universal_newlines must be False") 

1797 if shell: 

1798 raise ValueError("shell must be False") 

1799 if bufsize != 0: 

1800 raise ValueError("bufsize must be 0") 

1801 if text: 

1802 raise ValueError("text must be False") 

1803 if encoding is not None: 

1804 raise ValueError("encoding must be None") 

1805 if errors is not None: 

1806 raise ValueError("errors must be None") 

1807 

1808 popen_args = (program,) + args 

1809 protocol = protocol_factory() 

1810 debug_log = None 

1811 if self._debug: 1811 ↛ 1814line 1811 didn't jump to line 1814 because the condition on line 1811 was never true

1812 # don't log parameters: they may contain sensitive information 

1813 # (password) and may be too long 

1814 debug_log = f'execute program {program!r}' 

1815 self._log_subprocess(debug_log, stdin, stdout, stderr) 

1816 transport = await self._make_subprocess_transport( 

1817 protocol, popen_args, False, stdin, stdout, stderr, 

1818 bufsize, **kwargs) 

1819 if self._debug and debug_log is not None: 1819 ↛ 1820line 1819 didn't jump to line 1820 because the condition on line 1819 was never true

1820 logger.info('%s: %r', debug_log, transport) 

1821 return transport, protocol 

1822 

1823 def get_exception_handler(self): 

1824 """Return an exception handler, or None if the default one is in use. 

1825 """ 

1826 return self._exception_handler 

1827 

1828 def set_exception_handler(self, handler): 

1829 """Set handler as the new event loop exception handler. 

1830 

1831 If handler is None, the default exception handler will 

1832 be set. 

1833 

1834 If handler is a callable object, it should have a 

1835 signature matching '(loop, context)', where 'loop' 

1836 will be a reference to the active event loop, 'context' 

1837 will be a dict object (see `call_exception_handler()` 

1838 documentation for details about context). 

1839 """ 

1840 if handler is not None and not callable(handler): 

1841 raise TypeError(f'A callable object or None is expected, ' 

1842 f'got {handler!r}') 

1843 self._exception_handler = handler 

1844 

1845 def default_exception_handler(self, context): 

1846 """Default exception handler. 

1847 

1848 This is called when an exception occurs and no exception 

1849 handler is set, and can be called by a custom exception 

1850 handler that wants to defer to the default behavior. 

1851 

1852 This default handler logs the error message and other 

1853 context-dependent information. In debug mode, a truncated 

1854 stack trace is also appended showing where the given object 

1855 (e.g. a handle or future or task) was created, if any. 

1856 

1857 The context parameter has the same meaning as in 

1858 `call_exception_handler()`. 

1859 """ 

1860 message = context.get('message') 

1861 if not message: 1861 ↛ 1862line 1861 didn't jump to line 1862 because the condition on line 1861 was never true

1862 message = 'Unhandled exception in event loop' 

1863 

1864 exception = context.get('exception') 

1865 if exception is not None: 

1866 exc_info = (type(exception), exception, exception.__traceback__) 

1867 else: 

1868 exc_info = False 

1869 

1870 if ('source_traceback' not in context and 1870 ↛ 1873line 1870 didn't jump to line 1873 because the condition on line 1870 was never true

1871 self._current_handle is not None and 

1872 self._current_handle._source_traceback): 

1873 context['handle_traceback'] = \ 

1874 self._current_handle._source_traceback 

1875 

1876 log_lines = [message] 

1877 for key in sorted(context): 

1878 if key in {'message', 'exception'}: 

1879 continue 

1880 value = context[key] 

1881 if key == 'source_traceback': 

1882 tb = ''.join(traceback.format_list(value)) 

1883 value = 'Object created at (most recent call last):\n' 

1884 value += tb.rstrip() 

1885 elif key == 'handle_traceback': 1885 ↛ 1886line 1885 didn't jump to line 1886 because the condition on line 1885 was never true

1886 tb = ''.join(traceback.format_list(value)) 

1887 value = 'Handle created at (most recent call last):\n' 

1888 value += tb.rstrip() 

1889 else: 

1890 value = repr(value) 

1891 log_lines.append(f'{key}: {value}') 

1892 

1893 logger.error('\n'.join(log_lines), exc_info=exc_info) 

1894 

1895 def call_exception_handler(self, context): 

1896 """Call the current event loop's exception handler. 

1897 

1898 The context argument is a dict containing the following keys: 

1899 

1900 - 'message': Error message; 

1901 - 'exception' (optional): Exception object; 

1902 - 'future' (optional): Future instance; 

1903 - 'task' (optional): Task instance; 

1904 - 'handle' (optional): Handle instance; 

1905 - 'protocol' (optional): Protocol instance; 

1906 - 'transport' (optional): Transport instance; 

1907 - 'socket' (optional): Socket instance; 

1908 - 'source_traceback' (optional): Traceback of the source; 

1909 - 'handle_traceback' (optional): Traceback of the handle; 

1910 - 'asyncgen' (optional): Asynchronous generator that caused 

1911 the exception. 

1912 

1913 New keys maybe introduced in the future. 

1914 

1915 Note: do not overload this method in an event loop subclass. 

1916 For custom exception handling, use the 

1917 `set_exception_handler()` method. 

1918 """ 

1919 if self._exception_handler is None: 

1920 try: 

1921 self.default_exception_handler(context) 

1922 except (SystemExit, KeyboardInterrupt): 

1923 raise 

1924 except BaseException: 

1925 # Second protection layer for unexpected errors 

1926 # in the default implementation, as well as for subclassed 

1927 # event loops with overloaded "default_exception_handler". 

1928 logger.error('Exception in default exception handler', 

1929 exc_info=True) 

1930 else: 

1931 try: 

1932 ctx = None 

1933 thing = context.get("task") 

1934 if thing is None: 

1935 # Even though Futures don't have a context, 

1936 # Task is a subclass of Future, 

1937 # and sometimes the 'future' key holds a Task. 

1938 thing = context.get("future") 

1939 if thing is None: 

1940 # Handles also have a context. 

1941 thing = context.get("handle") 

1942 if thing is not None and hasattr(thing, "get_context"): 

1943 ctx = thing.get_context() 

1944 if ctx is not None and hasattr(ctx, "run"): 

1945 ctx.run(self._exception_handler, self, context) 

1946 else: 

1947 self._exception_handler(self, context) 

1948 except (SystemExit, KeyboardInterrupt): 

1949 raise 

1950 except BaseException as exc: 

1951 # Exception in the user set custom exception handler. 

1952 try: 

1953 # Let's try default handler. 

1954 self.default_exception_handler({ 

1955 'message': 'Unhandled error in exception handler', 

1956 'exception': exc, 

1957 'context': context, 

1958 }) 

1959 except (SystemExit, KeyboardInterrupt): 

1960 raise 

1961 except BaseException: 

1962 # Guard 'default_exception_handler' in case it is 

1963 # overloaded. 

1964 logger.error('Exception in default exception handler ' 

1965 'while handling an unexpected error ' 

1966 'in custom exception handler', 

1967 exc_info=True) 

1968 

1969 def _add_callback(self, handle): 

1970 """Add a Handle to _ready.""" 

1971 if not handle._cancelled: 

1972 self._ready.append(handle) 

1973 

1974 def _add_callback_signalsafe(self, handle): 

1975 """Like _add_callback() but called from a signal handler.""" 

1976 self._add_callback(handle) 

1977 self._write_to_self() 

1978 

1979 def _timer_handle_cancelled(self, handle): 

1980 """Notification that a TimerHandle has been cancelled.""" 

1981 if handle._scheduled: 

1982 self._timer_cancelled_count += 1 

1983 

1984 def _run_once(self): 

1985 """Run one full iteration of the event loop. 

1986 

1987 This calls all currently ready callbacks, polls for I/O, 

1988 schedules the resulting callbacks, and finally schedules 

1989 'call_later' callbacks. 

1990 """ 

1991 

1992 sched_count = len(self._scheduled) 

1993 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and 

1994 self._timer_cancelled_count / sched_count > 

1995 _MIN_CANCELLED_TIMER_HANDLES_FRACTION): 

1996 # Remove delayed calls that were cancelled if their number 

1997 # is too high 

1998 new_scheduled = [] 

1999 for handle in self._scheduled: 

2000 if handle._cancelled: 

2001 handle._scheduled = False 

2002 else: 

2003 new_scheduled.append(handle) 

2004 

2005 heapq.heapify(new_scheduled) 

2006 self._scheduled = new_scheduled 

2007 self._timer_cancelled_count = 0 

2008 else: 

2009 # Remove delayed calls that were cancelled from head of queue. 

2010 while self._scheduled and self._scheduled[0]._cancelled: 

2011 self._timer_cancelled_count -= 1 

2012 handle = heapq.heappop(self._scheduled) 

2013 handle._scheduled = False 

2014 

2015 timeout = None 

2016 if self._ready or self._stopping: 

2017 timeout = 0 

2018 elif self._scheduled: 

2019 # Compute the desired timeout. 

2020 timeout = self._scheduled[0]._when - self.time() 

2021 if timeout > MAXIMUM_SELECT_TIMEOUT: 2021 ↛ 2022line 2021 didn't jump to line 2022 because the condition on line 2021 was never true

2022 timeout = MAXIMUM_SELECT_TIMEOUT 

2023 elif timeout < 0: 

2024 timeout = 0 

2025 

2026 event_list = self._selector.select(timeout) 

2027 self._process_events(event_list) 

2028 # Needed to break cycles when an exception occurs. 

2029 event_list = None 

2030 

2031 # Handle 'later' callbacks that are ready. 

2032 now = self.time() 

2033 # Ensure that `end_time` is strictly increasing 

2034 # when the clock resolution is too small. 

2035 end_time = now + max(self._clock_resolution, math.ulp(now)) 

2036 while self._scheduled: 

2037 handle = self._scheduled[0] 

2038 if handle._when >= end_time: 

2039 break 

2040 handle = heapq.heappop(self._scheduled) 

2041 handle._scheduled = False 

2042 self._ready.append(handle) 

2043 

2044 # This is the only place where callbacks are actually *called*. 

2045 # All other places just add them to ready. 

2046 # Note: We run all currently scheduled callbacks, but not any 

2047 # callbacks scheduled by callbacks run this time around -- 

2048 # they will be run the next time (after another I/O poll). 

2049 # Use an idiom that is thread-safe without using locks. 

2050 ntodo = len(self._ready) 

2051 for i in range(ntodo): 

2052 handle = self._ready.popleft() 

2053 if handle._cancelled: 

2054 continue 

2055 if self._debug: 

2056 try: 

2057 self._current_handle = handle 

2058 t0 = self.time() 

2059 handle._run() 

2060 dt = self.time() - t0 

2061 if dt >= self.slow_callback_duration: 

2062 logger.warning('Executing %s took %.3f seconds', 

2063 _format_handle(handle), dt) 

2064 finally: 

2065 self._current_handle = None 

2066 else: 

2067 handle._run() 

2068 handle = None # Needed to break cycles when an exception occurs. 

2069 

2070 def _set_coroutine_origin_tracking(self, enabled): 

2071 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled): 

2072 return 

2073 

2074 if enabled: 

2075 self._coroutine_origin_tracking_saved_depth = ( 

2076 sys.get_coroutine_origin_tracking_depth()) 

2077 sys.set_coroutine_origin_tracking_depth( 

2078 constants.DEBUG_STACK_DEPTH) 

2079 else: 

2080 sys.set_coroutine_origin_tracking_depth( 

2081 self._coroutine_origin_tracking_saved_depth) 

2082 

2083 self._coroutine_origin_tracking_enabled = enabled 

2084 

2085 def get_debug(self): 

2086 return self._debug 

2087 

2088 def set_debug(self, enabled): 

2089 self._debug = enabled 

2090 

2091 if self.is_running(): 

2092 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)