Coverage for Lib / asyncio / base_events.py: 88%

1164 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 02:07 +0000

1"""Base implementation of event loop. 

2 

3The event loop can be broken up into a multiplexer (the part 

4responsible for notifying us of I/O events) and the event loop proper, 

5which wraps a multiplexer with functionality for scheduling callbacks, 

6immediately or at a given time in the future. 

7 

8Whenever a public API takes a callback, subsequent positional 

9arguments will be passed to the callback if/when it is called. This 

10avoids the proliferation of trivial lambdas implementing closures. 

11Keyword arguments for the callback are not supported; this is a 

12conscious design decision, leaving the door open for keyword arguments 

13to modify the meaning of the API call itself. 

14""" 

15 

16import collections 

17import collections.abc 

18import concurrent.futures 

19import errno 

20import heapq 

21import itertools 

22import math 

23import os 

24import socket 

25import stat 

26import subprocess 

27import sys 

28import threading 

29import time 

30import traceback 

31import warnings 

32import weakref 

33 

34try: 

35 import ssl 

36except ImportError: # pragma: no cover 

37 ssl = None 

38 

39from . import constants 

40from . import coroutines 

41from . import events 

42from . import exceptions 

43from . import futures 

44from . import protocols 

45from . import sslproto 

46from . import staggered 

47from . import tasks 

48from . import timeouts 

49from . import transports 

50from . import trsock 

51from .log import logger 

52 

53 

54__all__ = 'BaseEventLoop','Server', 

55 

56 

57# Minimum number of _scheduled timer handles before cleanup of 

58# cancelled handles is performed. 

59_MIN_SCHEDULED_TIMER_HANDLES = 100 

60 

61# Minimum fraction of _scheduled timer handles that are cancelled 

62# before cleanup of cancelled handles is performed. 

63_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 

64 

65 

66_HAS_IPv6 = hasattr(socket, 'AF_INET6') 

67 

68# Maximum timeout passed to select to avoid OS limitations 

69MAXIMUM_SELECT_TIMEOUT = 24 * 3600 

70 

71 

72def _format_handle(handle): 

73 cb = handle._callback 

74 if isinstance(getattr(cb, '__self__', None), tasks.Task): 

75 # format the task 

76 return repr(cb.__self__) 

77 else: 

78 return str(handle) 

79 

80 

81def _format_pipe(fd): 

82 if fd == subprocess.PIPE: 

83 return '<pipe>' 

84 elif fd == subprocess.STDOUT: 

85 return '<stdout>' 

86 else: 

87 return repr(fd) 

88 

89 

90def _set_reuseport(sock): 

91 if not hasattr(socket, 'SO_REUSEPORT'): 

92 raise ValueError('reuse_port not supported by socket module') 

93 else: 

94 try: 

95 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 

96 except OSError: 

97 raise ValueError('reuse_port not supported by socket module, ' 

98 'SO_REUSEPORT defined but not implemented.') 

99 

100 

101def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0): 

102 # Try to skip getaddrinfo if "host" is already an IP. Users might have 

103 # handled name resolution in their own code and pass in resolved IPs. 

104 if not hasattr(socket, 'inet_pton'): 

105 return 

106 

107 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \ 

108 host is None: 

109 return None 

110 

111 if type == socket.SOCK_STREAM: 

112 proto = socket.IPPROTO_TCP 

113 elif type == socket.SOCK_DGRAM: 

114 proto = socket.IPPROTO_UDP 

115 else: 

116 return None 

117 

118 if port is None: 

119 port = 0 

120 elif isinstance(port, bytes) and port == b'': 

121 port = 0 

122 elif isinstance(port, str) and port == '': 

123 port = 0 

124 else: 

125 # If port's a service name like "http", don't skip getaddrinfo. 

126 try: 

127 port = int(port) 

128 except (TypeError, ValueError): 

129 return None 

130 

131 if family == socket.AF_UNSPEC: 

132 afs = [socket.AF_INET] 

133 if _HAS_IPv6: 133 ↛ 138line 133 didn't jump to line 138 because the condition on line 133 was always true

134 afs.append(socket.AF_INET6) 

135 else: 

136 afs = [family] 

137 

138 if isinstance(host, bytes): 

139 host = host.decode('idna') 

140 if '%' in host: 

141 # Linux's inet_pton doesn't accept an IPv6 zone index after host, 

142 # like '::1%lo0'. 

143 return None 

144 

145 for af in afs: 

146 try: 

147 socket.inet_pton(af, host) 

148 # The host has already been resolved. 

149 if _HAS_IPv6 and af == socket.AF_INET6: 

150 return af, type, proto, '', (host, port, flowinfo, scopeid) 

151 else: 

152 return af, type, proto, '', (host, port) 

153 except OSError: 

154 pass 

155 

156 # "host" is not an IP address. 

157 return None 

158 

159 

160def _interleave_addrinfos(addrinfos, first_address_family_count=1): 

161 """Interleave list of addrinfo tuples by family.""" 

162 # Group addresses by family 

163 addrinfos_by_family = collections.OrderedDict() 

164 for addr in addrinfos: 

165 family = addr[0] 

166 if family not in addrinfos_by_family: 

167 addrinfos_by_family[family] = [] 

168 addrinfos_by_family[family].append(addr) 

169 addrinfos_lists = list(addrinfos_by_family.values()) 

170 

171 reordered = [] 

172 if first_address_family_count > 1: 

173 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1]) 

174 del addrinfos_lists[0][:first_address_family_count - 1] 

175 reordered.extend( 

176 a for a in itertools.chain.from_iterable( 

177 itertools.zip_longest(*addrinfos_lists) 

178 ) if a is not None) 

179 return reordered 

180 

181 

182def _run_until_complete_cb(fut): 

183 if not fut.cancelled(): 

184 exc = fut.exception() 

185 if isinstance(exc, (SystemExit, KeyboardInterrupt)): 

186 # Issue #22429: run_forever() already finished, no need to 

187 # stop it. 

188 return 

189 futures._get_loop(fut).stop() 

190 

191 

192if hasattr(socket, 'TCP_NODELAY'): 192 ↛ 199line 192 didn't jump to line 199 because the condition on line 192 was always true

193 def _set_nodelay(sock): 

194 if (sock.family in {socket.AF_INET, socket.AF_INET6} and 

195 sock.type == socket.SOCK_STREAM and 

196 sock.proto == socket.IPPROTO_TCP): 

197 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 

198else: 

199 def _set_nodelay(sock): 

200 pass 

201 

202 

203def _check_ssl_socket(sock): 

204 if ssl is not None and isinstance(sock, ssl.SSLSocket): 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true

205 raise TypeError("Socket cannot be of type SSLSocket") 

206 

207 

208class _SendfileFallbackProtocol(protocols.Protocol): 

209 def __init__(self, transp): 

210 if not isinstance(transp, transports._FlowControlMixin): 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true

211 raise TypeError("transport should be _FlowControlMixin instance") 

212 self._transport = transp 

213 self._proto = transp.get_protocol() 

214 self._should_resume_reading = transp.is_reading() 

215 self._should_resume_writing = transp._protocol_paused 

216 transp.pause_reading() 

217 transp.set_protocol(self) 

218 if self._should_resume_writing: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 self._write_ready_fut = self._transport._loop.create_future() 

220 else: 

221 self._write_ready_fut = None 

222 

223 async def drain(self): 

224 if self._transport.is_closing(): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 raise ConnectionError("Connection closed by peer") 

226 fut = self._write_ready_fut 

227 if fut is None: 

228 return 

229 await fut 

230 

231 def connection_made(self, transport): 

232 raise RuntimeError("Invalid state: " 

233 "connection should have been established already.") 

234 

235 def connection_lost(self, exc): 

236 if self._write_ready_fut is not None: 236 ↛ 244line 236 didn't jump to line 244 because the condition on line 236 was always true

237 # Never happens if peer disconnects after sending the whole content 

238 # Thus disconnection is always an exception from user perspective 

239 if exc is None: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true

240 self._write_ready_fut.set_exception( 

241 ConnectionError("Connection is closed by peer")) 

242 else: 

243 self._write_ready_fut.set_exception(exc) 

244 self._proto.connection_lost(exc) 

245 

246 def pause_writing(self): 

247 if self._write_ready_fut is not None: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true

248 return 

249 self._write_ready_fut = self._transport._loop.create_future() 

250 

251 def resume_writing(self): 

252 if self._write_ready_fut is None: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true

253 return 

254 self._write_ready_fut.set_result(False) 

255 self._write_ready_fut = None 

256 

257 def data_received(self, data): 

258 raise RuntimeError("Invalid state: reading should be paused") 

259 

260 def eof_received(self): 

261 raise RuntimeError("Invalid state: reading should be paused") 

262 

263 async def restore(self): 

264 self._transport.set_protocol(self._proto) 

265 if self._should_resume_reading: 265 ↛ 267line 265 didn't jump to line 267 because the condition on line 265 was always true

266 self._transport.resume_reading() 

267 if self._write_ready_fut is not None: 

268 # Cancel the future. 

269 # Basically it has no effect because protocol is switched back, 

270 # no code should wait for it anymore. 

271 self._write_ready_fut.cancel() 

272 if self._should_resume_writing: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true

273 self._proto.resume_writing() 

274 

275 

276class Server(events.AbstractServer): 

277 

278 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, 

279 ssl_handshake_timeout, ssl_shutdown_timeout=None): 

280 self._loop = loop 

281 self._sockets = sockets 

282 # Weak references so we don't break Transport's ability to 

283 # detect abandoned transports 

284 self._clients = weakref.WeakSet() 

285 self._waiters = [] 

286 self._protocol_factory = protocol_factory 

287 self._backlog = backlog 

288 self._ssl_context = ssl_context 

289 self._ssl_handshake_timeout = ssl_handshake_timeout 

290 self._ssl_shutdown_timeout = ssl_shutdown_timeout 

291 self._serving = False 

292 self._serving_forever_fut = None 

293 

294 def __repr__(self): 

295 return f'<{self.__class__.__name__} sockets={self.sockets!r}>' 

296 

297 def _attach(self, transport): 

298 assert self._sockets is not None 

299 self._clients.add(transport) 

300 

301 def _detach(self, transport): 

302 self._clients.discard(transport) 

303 if len(self._clients) == 0 and self._sockets is None: 

304 self._wakeup() 

305 

306 def _wakeup(self): 

307 waiters = self._waiters 

308 self._waiters = None 

309 for waiter in waiters: 

310 if not waiter.done(): 310 ↛ 309line 310 didn't jump to line 309 because the condition on line 310 was always true

311 waiter.set_result(None) 

312 

313 def _start_serving(self): 

314 if self._serving: 

315 return 

316 self._serving = True 

317 for sock in self._sockets: 

318 sock.listen(self._backlog) 

319 self._loop._start_serving( 

320 self._protocol_factory, sock, self._ssl_context, 

321 self, self._backlog, self._ssl_handshake_timeout, 

322 self._ssl_shutdown_timeout) 

323 

324 def get_loop(self): 

325 return self._loop 

326 

327 def is_serving(self): 

328 return self._serving 

329 

330 @property 

331 def sockets(self): 

332 if self._sockets is None: 

333 return () 

334 return tuple(trsock.TransportSocket(s) for s in self._sockets) 

335 

336 def close(self): 

337 sockets = self._sockets 

338 if sockets is None: 

339 return 

340 self._sockets = None 

341 

342 for sock in sockets: 

343 self._loop._stop_serving(sock) 

344 

345 self._serving = False 

346 

347 if (self._serving_forever_fut is not None and 347 ↛ 349line 347 didn't jump to line 349 because the condition on line 347 was never true

348 not self._serving_forever_fut.done()): 

349 self._serving_forever_fut.cancel() 

350 self._serving_forever_fut = None 

351 

352 if len(self._clients) == 0: 

353 self._wakeup() 

354 

355 def close_clients(self): 

356 for transport in self._clients.copy(): 

357 transport.close() 

358 

359 def abort_clients(self): 

360 for transport in self._clients.copy(): 

361 transport.abort() 

362 

363 async def start_serving(self): 

364 self._start_serving() 

365 # Skip one loop iteration so that all 'loop.add_reader' 

366 # go through. 

367 await tasks.sleep(0) 

368 

369 async def serve_forever(self): 

370 if self._serving_forever_fut is not None: 370 ↛ 371line 370 didn't jump to line 371 because the condition on line 370 was never true

371 raise RuntimeError( 

372 f'server {self!r} is already being awaited on serve_forever()') 

373 if self._sockets is None: 

374 raise RuntimeError(f'server {self!r} is closed') 

375 

376 self._start_serving() 

377 self._serving_forever_fut = self._loop.create_future() 

378 

379 try: 

380 await self._serving_forever_fut 

381 except exceptions.CancelledError: 

382 try: 

383 self.close() 

384 self.close_clients() 

385 await self.wait_closed() 

386 finally: 

387 raise 

388 finally: 

389 self._serving_forever_fut = None 

390 

391 async def wait_closed(self): 

392 """Wait until server is closed and all connections are dropped. 

393 

394 - If the server is not closed, wait. 

395 - If it is closed, but there are still active connections, wait. 

396 

397 Anyone waiting here will be unblocked once both conditions 

398 (server is closed and all connections have been dropped) 

399 have become true, in either order. 

400 

401 Historical note: In 3.11 and before, this was broken, returning 

402 immediately if the server was already closed, even if there 

403 were still active connections. An attempted fix in 3.12.0 was 

404 still broken, returning immediately if the server was still 

405 open and there were no active connections. Hopefully in 3.12.1 

406 we have it right. 

407 """ 

408 # Waiters are unblocked by self._wakeup(), which is called 

409 # from two places: self.close() and self._detach(), but only 

410 # when both conditions have become true. To signal that this 

411 # has happened, self._wakeup() sets self._waiters to None. 

412 if self._waiters is None: 

413 return 

414 waiter = self._loop.create_future() 

415 self._waiters.append(waiter) 

416 await waiter 

417 

418 

419class BaseEventLoop(events.AbstractEventLoop): 

420 

421 def __init__(self): 

422 self._timer_cancelled_count = 0 

423 self._closed = False 

424 self._stopping = False 

425 self._ready = collections.deque() 

426 self._scheduled = [] 

427 self._default_executor = None 

428 self._internal_fds = 0 

429 # Identifier of the thread running the event loop, or None if the 

430 # event loop is not running 

431 self._thread_id = None 

432 self._clock_resolution = time.get_clock_info('monotonic').resolution 

433 self._exception_handler = None 

434 self.set_debug(coroutines._is_debug_mode()) 

435 # The preserved state of async generator hooks. 

436 self._old_agen_hooks = None 

437 # In debug mode, if the execution of a callback or a step of a task 

438 # exceed this duration in seconds, the slow callback/task is logged. 

439 self.slow_callback_duration = 0.1 

440 self._current_handle = None 

441 self._task_factory = None 

442 self._coroutine_origin_tracking_enabled = False 

443 self._coroutine_origin_tracking_saved_depth = None 

444 

445 # A weak set of all asynchronous generators that are 

446 # being iterated by the loop. 

447 self._asyncgens = weakref.WeakSet() 

448 # Set to True when `loop.shutdown_asyncgens` is called. 

449 self._asyncgens_shutdown_called = False 

450 # Set to True when `loop.shutdown_default_executor` is called. 

451 self._executor_shutdown_called = False 

452 

453 def __repr__(self): 

454 return ( 

455 f'<{self.__class__.__name__} running={self.is_running()} ' 

456 f'closed={self.is_closed()} debug={self.get_debug()}>' 

457 ) 

458 

459 def create_future(self): 

460 """Create a Future object attached to the loop.""" 

461 return futures.Future(loop=self) 

462 

463 def create_task(self, coro, **kwargs): 

464 """Schedule or begin executing a coroutine object. 

465 

466 Return a task object. 

467 """ 

468 self._check_closed() 

469 if self._task_factory is not None: 

470 return self._task_factory(self, coro, **kwargs) 

471 

472 task = tasks.Task(coro, loop=self, **kwargs) 

473 if task._source_traceback: 

474 del task._source_traceback[-1] 

475 try: 

476 return task 

477 finally: 

478 # gh-128552: prevent a refcycle of 

479 # task.exception().__traceback__->BaseEventLoop.create_task->task 

480 del task 

481 

482 def set_task_factory(self, factory): 

483 """Set a task factory that will be used by loop.create_task(). 

484 

485 If factory is None the default task factory will be set. 

486 

487 If factory is a callable, it should have a signature matching 

488 '(loop, coro, **kwargs)', where 'loop' will be a reference to the active 

489 event loop, 'coro' will be a coroutine object, and **kwargs will be 

490 arbitrary keyword arguments that should be passed on to Task. 

491 The callable must return a Task. 

492 """ 

493 if factory is not None and not callable(factory): 

494 raise TypeError('task factory must be a callable or None') 

495 self._task_factory = factory 

496 

497 def get_task_factory(self): 

498 """Return a task factory, or None if the default one is in use.""" 

499 return self._task_factory 

500 

501 def _make_socket_transport(self, sock, protocol, waiter=None, *, 

502 extra=None, server=None): 

503 """Create socket transport.""" 

504 raise NotImplementedError 

505 

506 def _make_ssl_transport( 

507 self, rawsock, protocol, sslcontext, waiter=None, 

508 *, server_side=False, server_hostname=None, 

509 extra=None, server=None, 

510 ssl_handshake_timeout=None, 

511 ssl_shutdown_timeout=None, 

512 call_connection_made=True): 

513 """Create SSL transport.""" 

514 raise NotImplementedError 

515 

516 def _make_datagram_transport(self, sock, protocol, 

517 address=None, waiter=None, extra=None): 

518 """Create datagram transport.""" 

519 raise NotImplementedError 

520 

521 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 

522 extra=None): 

523 """Create read pipe transport.""" 

524 raise NotImplementedError 

525 

526 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 

527 extra=None): 

528 """Create write pipe transport.""" 

529 raise NotImplementedError 

530 

531 async def _make_subprocess_transport(self, protocol, args, shell, 

532 stdin, stdout, stderr, bufsize, 

533 extra=None, **kwargs): 

534 """Create subprocess transport.""" 

535 raise NotImplementedError 

536 

537 def _write_to_self(self): 

538 """Write a byte to self-pipe, to wake up the event loop. 

539 

540 This may be called from a different thread. 

541 

542 The subclass is responsible for implementing the self-pipe. 

543 """ 

544 raise NotImplementedError 

545 

546 def _process_events(self, event_list): 

547 """Process selector events.""" 

548 raise NotImplementedError 

549 

550 def _check_closed(self): 

551 if self._closed: 

552 raise RuntimeError('Event loop is closed') 

553 

554 def _check_default_executor(self): 

555 if self._executor_shutdown_called: 555 ↛ 556line 555 didn't jump to line 556 because the condition on line 555 was never true

556 raise RuntimeError('Executor shutdown has been called') 

557 

558 def _asyncgen_finalizer_hook(self, agen): 

559 self._asyncgens.discard(agen) 

560 if not self.is_closed(): 560 ↛ exitline 560 didn't return from function '_asyncgen_finalizer_hook' because the condition on line 560 was always true

561 self.call_soon_threadsafe(self.create_task, agen.aclose()) 

562 

563 def _asyncgen_firstiter_hook(self, agen): 

564 if self._asyncgens_shutdown_called: 564 ↛ 565line 564 didn't jump to line 565 because the condition on line 564 was never true

565 warnings.warn( 

566 f"asynchronous generator {agen!r} was scheduled after " 

567 f"loop.shutdown_asyncgens() call", 

568 ResourceWarning, source=self) 

569 

570 self._asyncgens.add(agen) 

571 

572 async def shutdown_asyncgens(self): 

573 """Shutdown all active asynchronous generators.""" 

574 self._asyncgens_shutdown_called = True 

575 

576 if not len(self._asyncgens): 576 ↛ 581line 576 didn't jump to line 581 because the condition on line 576 was always true

577 # If Python version is <3.6 or we don't have any asynchronous 

578 # generators alive. 

579 return 

580 

581 closing_agens = list(self._asyncgens) 

582 self._asyncgens.clear() 

583 

584 results = await tasks.gather( 

585 *[ag.aclose() for ag in closing_agens], 

586 return_exceptions=True) 

587 

588 for result, agen in zip(results, closing_agens): 

589 if isinstance(result, Exception): 

590 self.call_exception_handler({ 

591 'message': f'an error occurred during closing of ' 

592 f'asynchronous generator {agen!r}', 

593 'exception': result, 

594 'asyncgen': agen 

595 }) 

596 

597 async def shutdown_default_executor(self, timeout=None): 

598 """Schedule the shutdown of the default executor. 

599 

600 The timeout parameter specifies the amount of time the executor will 

601 be given to finish joining. The default value is None, which means 

602 that the executor will be given an unlimited amount of time. 

603 """ 

604 self._executor_shutdown_called = True 

605 if self._default_executor is None: 

606 return 

607 future = self.create_future() 

608 thread = threading.Thread(target=self._do_shutdown, args=(future,)) 

609 thread.start() 

610 try: 

611 async with timeouts.timeout(timeout): 

612 await future 

613 except TimeoutError: 

614 warnings.warn("The executor did not finishing joining " 

615 f"its threads within {timeout} seconds.", 

616 RuntimeWarning, stacklevel=2) 

617 self._default_executor.shutdown(wait=False) 

618 else: 

619 thread.join() 

620 

621 def _do_shutdown(self, future): 

622 try: 

623 self._default_executor.shutdown(wait=True) 

624 if not self.is_closed(): 624 ↛ exitline 624 didn't return from function '_do_shutdown' because the condition on line 624 was always true

625 self.call_soon_threadsafe(futures._set_result_unless_cancelled, 

626 future, None) 

627 except Exception as ex: 

628 if not self.is_closed() and not future.cancelled(): 

629 self.call_soon_threadsafe(future.set_exception, ex) 

630 

631 def _check_running(self): 

632 if self.is_running(): 

633 raise RuntimeError('This event loop is already running') 

634 if events._get_running_loop() is not None: 

635 raise RuntimeError( 

636 'Cannot run the event loop while another loop is running') 

637 

638 def _run_forever_setup(self): 

639 """Prepare the run loop to process events. 

640 

641 This method exists so that custom event loop subclasses (e.g., event loops 

642 that integrate a GUI event loop with Python's event loop) have access to all the 

643 loop setup logic. 

644 """ 

645 self._check_closed() 

646 self._check_running() 

647 self._set_coroutine_origin_tracking(self._debug) 

648 

649 self._old_agen_hooks = sys.get_asyncgen_hooks() 

650 self._thread_id = threading.get_ident() 

651 sys.set_asyncgen_hooks( 

652 firstiter=self._asyncgen_firstiter_hook, 

653 finalizer=self._asyncgen_finalizer_hook 

654 ) 

655 

656 events._set_running_loop(self) 

657 

658 def _run_forever_cleanup(self): 

659 """Clean up after an event loop finishes the looping over events. 

660 

661 This method exists so that custom event loop subclasses (e.g., event loops 

662 that integrate a GUI event loop with Python's event loop) have access to all the 

663 loop cleanup logic. 

664 """ 

665 self._stopping = False 

666 self._thread_id = None 

667 events._set_running_loop(None) 

668 self._set_coroutine_origin_tracking(False) 

669 # Restore any pre-existing async generator hooks. 

670 if self._old_agen_hooks is not None: 670 ↛ exitline 670 didn't return from function '_run_forever_cleanup' because the condition on line 670 was always true

671 sys.set_asyncgen_hooks(*self._old_agen_hooks) 

672 self._old_agen_hooks = None 

673 

674 def run_forever(self): 

675 """Run until stop() is called.""" 

676 self._run_forever_setup() 

677 try: 

678 while True: 

679 self._run_once() 

680 if self._stopping: 

681 break 

682 finally: 

683 self._run_forever_cleanup() 

684 

685 def run_until_complete(self, future): 

686 """Run until the Future is done. 

687 

688 If the argument is a coroutine, it is wrapped in a Task. 

689 

690 WARNING: It would be disastrous to call run_until_complete() 

691 with the same coroutine twice -- it would wrap it in two 

692 different Tasks and that can't be good. 

693 

694 Return the Future's result, or raise its exception. 

695 """ 

696 self._check_closed() 

697 self._check_running() 

698 

699 new_task = not futures.isfuture(future) 

700 future = tasks.ensure_future(future, loop=self) 

701 if new_task: 

702 # An exception is raised if the future didn't complete, so there 

703 # is no need to log the "destroy pending task" message 

704 future._log_destroy_pending = False 

705 

706 future.add_done_callback(_run_until_complete_cb) 

707 try: 

708 self.run_forever() 

709 except: 

710 if new_task and future.done() and not future.cancelled(): 

711 # The coroutine raised a BaseException. Consume the exception 

712 # to not log a warning, the caller doesn't have access to the 

713 # local task. 

714 future.exception() 

715 raise 

716 finally: 

717 future.remove_done_callback(_run_until_complete_cb) 

718 if not future.done(): 

719 raise RuntimeError('Event loop stopped before Future completed.') 

720 

721 return future.result() 

722 

723 def stop(self): 

724 """Stop running the event loop. 

725 

726 Every callback already scheduled will still run. This simply informs 

727 run_forever to stop looping after a complete iteration. 

728 """ 

729 self._stopping = True 

730 

731 def close(self): 

732 """Close the event loop. 

733 

734 This clears the queues and shuts down the executor, 

735 but does not wait for the executor to finish. 

736 

737 The event loop must not be running. 

738 """ 

739 if self.is_running(): 739 ↛ 740line 739 didn't jump to line 740 because the condition on line 739 was never true

740 raise RuntimeError("Cannot close a running event loop") 

741 if self._closed: 

742 return 

743 if self._debug: 

744 logger.debug("Close %r", self) 

745 self._closed = True 

746 self._ready.clear() 

747 self._scheduled.clear() 

748 self._executor_shutdown_called = True 

749 executor = self._default_executor 

750 if executor is not None: 

751 self._default_executor = None 

752 executor.shutdown(wait=False) 

753 

754 def is_closed(self): 

755 """Returns True if the event loop was closed.""" 

756 return self._closed 

757 

758 def __del__(self, _warn=warnings.warn): 

759 if not self.is_closed(): 759 ↛ 760line 759 didn't jump to line 760 because the condition on line 759 was never true

760 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self) 

761 if not self.is_running(): 

762 self.close() 

763 

764 def is_running(self): 

765 """Returns True if the event loop is running.""" 

766 return (self._thread_id is not None) 

767 

768 def time(self): 

769 """Return the time according to the event loop's clock. 

770 

771 This is a float expressed in seconds since an epoch, but the 

772 epoch, precision, accuracy and drift are unspecified and may 

773 differ per event loop. 

774 """ 

775 return time.monotonic() 

776 

777 def call_later(self, delay, callback, *args, context=None): 

778 """Arrange for a callback to be called at a given time. 

779 

780 Return a Handle: an opaque object with a cancel() method that 

781 can be used to cancel the call. 

782 

783 The delay can be an int or float, expressed in seconds. It is 

784 always relative to the current time. 

785 

786 Each callback will be called exactly once. If two callbacks 

787 are scheduled for exactly the same time, it is undefined which 

788 will be called first. 

789 

790 Any positional arguments after the callback will be passed to 

791 the callback when it is called. 

792 """ 

793 if delay is None: 

794 raise TypeError('delay must not be None') 

795 timer = self.call_at(self.time() + delay, callback, *args, 

796 context=context) 

797 if timer._source_traceback: 

798 del timer._source_traceback[-1] 

799 return timer 

800 

801 def call_at(self, when, callback, *args, context=None): 

802 """Like call_later(), but uses an absolute time. 

803 

804 Absolute time corresponds to the event loop's time() method. 

805 """ 

806 if when is None: 

807 raise TypeError("when cannot be None") 

808 self._check_closed() 

809 if self._debug: 

810 self._check_thread() 

811 self._check_callback(callback, 'call_at') 

812 timer = events.TimerHandle(when, callback, args, self, context) 

813 if timer._source_traceback: 

814 del timer._source_traceback[-1] 

815 heapq.heappush(self._scheduled, timer) 

816 timer._scheduled = True 

817 return timer 

818 

819 def call_soon(self, callback, *args, context=None): 

820 """Arrange for a callback to be called as soon as possible. 

821 

822 This operates as a FIFO queue: callbacks are called in the 

823 order in which they are registered. Each callback will be 

824 called exactly once. 

825 

826 Any positional arguments after the callback will be passed to 

827 the callback when it is called. 

828 """ 

829 self._check_closed() 

830 if self._debug: 

831 self._check_thread() 

832 self._check_callback(callback, 'call_soon') 

833 handle = self._call_soon(callback, args, context) 

834 if handle._source_traceback: 

835 del handle._source_traceback[-1] 

836 return handle 

837 

838 def _check_callback(self, callback, method): 

839 if (coroutines.iscoroutine(callback) or 

840 coroutines._iscoroutinefunction(callback)): 

841 raise TypeError( 

842 f"coroutines cannot be used with {method}()") 

843 if not callable(callback): 

844 raise TypeError( 

845 f'a callable object was expected by {method}(), ' 

846 f'got {callback!r}') 

847 

848 def _call_soon(self, callback, args, context): 

849 handle = events.Handle(callback, args, self, context) 

850 if handle._source_traceback: 

851 del handle._source_traceback[-1] 

852 self._ready.append(handle) 

853 return handle 

854 

855 def _check_thread(self): 

856 """Check that the current thread is the thread running the event loop. 

857 

858 Non-thread-safe methods of this class make this assumption and will 

859 likely behave incorrectly when the assumption is violated. 

860 

861 Should only be called when (self._debug == True). The caller is 

862 responsible for checking this condition for performance reasons. 

863 """ 

864 if self._thread_id is None: 

865 return 

866 thread_id = threading.get_ident() 

867 if thread_id != self._thread_id: 

868 raise RuntimeError( 

869 "Non-thread-safe operation invoked on an event loop other " 

870 "than the current one") 

871 

872 def call_soon_threadsafe(self, callback, *args, context=None): 

873 """Like call_soon(), but thread-safe.""" 

874 self._check_closed() 

875 if self._debug: 

876 self._check_callback(callback, 'call_soon_threadsafe') 

877 handle = events._ThreadSafeHandle(callback, args, self, context) 

878 self._ready.append(handle) 

879 if handle._source_traceback: 

880 del handle._source_traceback[-1] 

881 if handle._source_traceback: 

882 del handle._source_traceback[-1] 

883 self._write_to_self() 

884 return handle 

885 

886 def run_in_executor(self, executor, func, *args): 

887 self._check_closed() 

888 if self._debug: 

889 self._check_callback(func, 'run_in_executor') 

890 if executor is None: 

891 executor = self._default_executor 

892 # Only check when the default executor is being used 

893 self._check_default_executor() 

894 if executor is None: 

895 executor = concurrent.futures.ThreadPoolExecutor( 

896 thread_name_prefix='asyncio' 

897 ) 

898 self._default_executor = executor 

899 return futures.wrap_future( 

900 executor.submit(func, *args), loop=self) 

901 

902 def set_default_executor(self, executor): 

903 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): 

904 raise TypeError('executor must be ThreadPoolExecutor instance') 

905 self._default_executor = executor 

906 

907 def _getaddrinfo_debug(self, host, port, family, type, proto, flags): 

908 msg = [f"{host}:{port!r}"] 

909 if family: 

910 msg.append(f'family={family!r}') 

911 if type: 

912 msg.append(f'type={type!r}') 

913 if proto: 

914 msg.append(f'proto={proto!r}') 

915 if flags: 

916 msg.append(f'flags={flags!r}') 

917 msg = ', '.join(msg) 

918 logger.debug('Get address info %s', msg) 

919 

920 t0 = self.time() 

921 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags) 

922 dt = self.time() - t0 

923 

924 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}' 

925 if dt >= self.slow_callback_duration: 

926 logger.info(msg) 

927 else: 

928 logger.debug(msg) 

929 return addrinfo 

930 

931 async def getaddrinfo(self, host, port, *, 

932 family=0, type=0, proto=0, flags=0): 

933 if self._debug: 933 ↛ 934line 933 didn't jump to line 934 because the condition on line 933 was never true

934 getaddr_func = self._getaddrinfo_debug 

935 else: 

936 getaddr_func = socket.getaddrinfo 

937 

938 return await self.run_in_executor( 

939 None, getaddr_func, host, port, family, type, proto, flags) 

940 

941 async def getnameinfo(self, sockaddr, flags=0): 

942 return await self.run_in_executor( 

943 None, socket.getnameinfo, sockaddr, flags) 

944 

945 async def sock_sendfile(self, sock, file, offset=0, count=None, 

946 *, fallback=True): 

947 if self._debug and sock.gettimeout() != 0: 

948 raise ValueError("the socket must be non-blocking") 

949 _check_ssl_socket(sock) 

950 self._check_sendfile_params(sock, file, offset, count) 

951 try: 

952 return await self._sock_sendfile_native(sock, file, 

953 offset, count) 

954 except exceptions.SendfileNotAvailableError: 

955 if not fallback: 

956 raise 

957 return await self._sock_sendfile_fallback(sock, file, 

958 offset, count) 

959 

960 async def _sock_sendfile_native(self, sock, file, offset, count): 

961 # NB: sendfile syscall is not supported for SSL sockets and 

962 # non-mmap files even if sendfile is supported by OS 

963 raise exceptions.SendfileNotAvailableError( 

964 f"syscall sendfile is not available for socket {sock!r} " 

965 f"and file {file!r} combination") 

966 

967 async def _sock_sendfile_fallback(self, sock, file, offset, count): 

968 if offset: 

969 file.seek(offset) 

970 blocksize = ( 

971 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE) 

972 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE 

973 ) 

974 buf = bytearray(blocksize) 

975 total_sent = 0 

976 try: 

977 while True: 

978 if count: 

979 blocksize = min(count - total_sent, blocksize) 

980 if blocksize <= 0: 

981 break 

982 view = memoryview(buf)[:blocksize] 

983 read = await self.run_in_executor(None, file.readinto, view) 

984 if not read: 

985 break # EOF 

986 await self.sock_sendall(sock, view[:read]) 

987 total_sent += read 

988 return total_sent 

989 finally: 

990 if total_sent > 0 and hasattr(file, 'seek'): 

991 file.seek(offset + total_sent) 

992 

993 def _check_sendfile_params(self, sock, file, offset, count): 

994 if 'b' not in getattr(file, 'mode', 'b'): 

995 raise ValueError("file should be opened in binary mode") 

996 if not sock.type == socket.SOCK_STREAM: 

997 raise ValueError("only SOCK_STREAM type sockets are supported") 

998 if count is not None: 

999 if not isinstance(count, int): 

1000 raise TypeError( 

1001 "count must be a positive integer (got {!r})".format(count)) 

1002 if count <= 0: 

1003 raise ValueError( 

1004 "count must be a positive integer (got {!r})".format(count)) 

1005 if not isinstance(offset, int): 

1006 raise TypeError( 

1007 "offset must be a non-negative integer (got {!r})".format( 

1008 offset)) 

1009 if offset < 0: 

1010 raise ValueError( 

1011 "offset must be a non-negative integer (got {!r})".format( 

1012 offset)) 

1013 

1014 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None): 

1015 """Create, bind and connect one socket.""" 

1016 my_exceptions = [] 

1017 exceptions.append(my_exceptions) 

1018 family, type_, proto, _, address = addr_info 

1019 sock = None 

1020 try: 

1021 try: 

1022 sock = socket.socket(family=family, type=type_, proto=proto) 

1023 sock.setblocking(False) 

1024 if local_addr_infos is not None: 

1025 for lfamily, _, _, _, laddr in local_addr_infos: 

1026 # skip local addresses of different family 

1027 if lfamily != family: 

1028 continue 

1029 try: 

1030 sock.bind(laddr) 

1031 break 

1032 except OSError as exc: 

1033 msg = ( 

1034 f'error while attempting to bind on ' 

1035 f'address {laddr!r}: {str(exc).lower()}' 

1036 ) 

1037 exc = OSError(exc.errno, msg) 

1038 my_exceptions.append(exc) 

1039 else: # all bind attempts failed 

1040 if my_exceptions: 

1041 raise my_exceptions.pop() 

1042 else: 

1043 raise OSError(f"no matching local address with {family=} found") 

1044 await self.sock_connect(sock, address) 

1045 return sock 

1046 except OSError as exc: 

1047 my_exceptions.append(exc) 

1048 raise 

1049 except: 

1050 if sock is not None: 

1051 try: 

1052 sock.close() 

1053 except OSError: 

1054 # An error when closing a newly created socket is 

1055 # not important, but it can overwrite more important 

1056 # non-OSError error. So ignore it. 

1057 pass 

1058 raise 

1059 finally: 

1060 exceptions = my_exceptions = None 

1061 

1062 async def create_connection( 

1063 self, protocol_factory, host=None, port=None, 

1064 *, ssl=None, family=0, 

1065 proto=0, flags=0, sock=None, 

1066 local_addr=None, server_hostname=None, 

1067 ssl_handshake_timeout=None, 

1068 ssl_shutdown_timeout=None, 

1069 happy_eyeballs_delay=None, interleave=None, 

1070 all_errors=False): 

1071 """Connect to a TCP server. 

1072 

1073 Create a streaming transport connection to a given internet host and 

1074 port: socket family AF_INET or socket.AF_INET6 depending on host (or 

1075 family if specified), socket type SOCK_STREAM. protocol_factory must be 

1076 a callable returning a protocol instance. 

1077 

1078 This method is a coroutine which will try to establish the connection 

1079 in the background. When successful, the coroutine returns a 

1080 (transport, protocol) pair. 

1081 """ 

1082 if server_hostname is not None and not ssl: 

1083 raise ValueError('server_hostname is only meaningful with ssl') 

1084 

1085 if server_hostname is None and ssl: 

1086 # Use host as default for server_hostname. It is an error 

1087 # if host is empty or not set, e.g. when an 

1088 # already-connected socket was passed or when only a port 

1089 # is given. To avoid this error, you can pass 

1090 # server_hostname='' -- this will bypass the hostname 

1091 # check. (This also means that if host is a numeric 

1092 # IP/IPv6 address, we will attempt to verify that exact 

1093 # address; this will probably fail, but it is possible to 

1094 # create a certificate for a specific IP address, so we 

1095 # don't judge it here.) 

1096 if not host: 

1097 raise ValueError('You must set server_hostname ' 

1098 'when using ssl without a host') 

1099 server_hostname = host 

1100 

1101 if ssl_handshake_timeout is not None and not ssl: 

1102 raise ValueError( 

1103 'ssl_handshake_timeout is only meaningful with ssl') 

1104 

1105 if ssl_shutdown_timeout is not None and not ssl: 1105 ↛ 1106line 1105 didn't jump to line 1106 because the condition on line 1105 was never true

1106 raise ValueError( 

1107 'ssl_shutdown_timeout is only meaningful with ssl') 

1108 

1109 if sock is not None: 

1110 _check_ssl_socket(sock) 

1111 

1112 if happy_eyeballs_delay is not None and interleave is None: 

1113 # If using happy eyeballs, default to interleave addresses by family 

1114 interleave = 1 

1115 

1116 if host is not None or port is not None: 

1117 if sock is not None: 

1118 raise ValueError( 

1119 'host/port and sock can not be specified at the same time') 

1120 

1121 infos = await self._ensure_resolved( 

1122 (host, port), family=family, 

1123 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) 

1124 if not infos: 

1125 raise OSError('getaddrinfo() returned empty list') 

1126 

1127 if local_addr is not None: 

1128 laddr_infos = await self._ensure_resolved( 

1129 local_addr, family=family, 

1130 type=socket.SOCK_STREAM, proto=proto, 

1131 flags=flags, loop=self) 

1132 if not laddr_infos: 

1133 raise OSError('getaddrinfo() returned empty list') 

1134 else: 

1135 laddr_infos = None 

1136 

1137 if interleave: 

1138 infos = _interleave_addrinfos(infos, interleave) 

1139 

1140 exceptions = [] 

1141 if happy_eyeballs_delay is None: 

1142 # not using happy eyeballs 

1143 for addrinfo in infos: 

1144 try: 

1145 sock = await self._connect_sock( 

1146 exceptions, addrinfo, laddr_infos) 

1147 break 

1148 except OSError: 

1149 continue 

1150 else: # using happy eyeballs 

1151 sock = (await staggered.staggered_race( 

1152 ( 

1153 # can't use functools.partial as it keeps a reference 

1154 # to exceptions 

1155 lambda addrinfo=addrinfo: self._connect_sock( 

1156 exceptions, addrinfo, laddr_infos 

1157 ) 

1158 for addrinfo in infos 

1159 ), 

1160 happy_eyeballs_delay, 

1161 loop=self, 

1162 ))[0] # can't use sock, _, _ as it keeks a reference to exceptions 

1163 

1164 if sock is None: 

1165 exceptions = [exc for sub in exceptions for exc in sub] 

1166 try: 

1167 if all_errors: 

1168 raise ExceptionGroup("create_connection failed", exceptions) 

1169 if len(exceptions) == 1: 

1170 raise exceptions[0] 

1171 elif exceptions: 

1172 # If they all have the same str(), raise one. 

1173 model = str(exceptions[0]) 

1174 if all(str(exc) == model for exc in exceptions): 

1175 raise exceptions[0] 

1176 # Raise a combined exception so the user can see all 

1177 # the various error messages. 

1178 raise OSError('Multiple exceptions: {}'.format( 

1179 ', '.join(str(exc) for exc in exceptions))) 

1180 else: 

1181 # No exceptions were collected, raise a timeout error 

1182 raise TimeoutError('create_connection failed') 

1183 finally: 

1184 exceptions = None 

1185 

1186 else: 

1187 if sock is None: 

1188 raise ValueError( 

1189 'host and port was not specified and no sock specified') 

1190 if sock.type != socket.SOCK_STREAM: 

1191 # We allow AF_INET, AF_INET6, AF_UNIX as long as they 

1192 # are SOCK_STREAM. 

1193 # We support passing AF_UNIX sockets even though we have 

1194 # a dedicated API for that: create_unix_connection. 

1195 # Disallowing AF_UNIX in this method, breaks backwards 

1196 # compatibility. 

1197 raise ValueError( 

1198 f'A Stream Socket was expected, got {sock!r}') 

1199 

1200 transport, protocol = await self._create_connection_transport( 

1201 sock, protocol_factory, ssl, server_hostname, 

1202 ssl_handshake_timeout=ssl_handshake_timeout, 

1203 ssl_shutdown_timeout=ssl_shutdown_timeout) 

1204 if self._debug: 

1205 # Get the socket from the transport because SSL transport closes 

1206 # the old socket and creates a new SSL socket 

1207 sock = transport.get_extra_info('socket') 

1208 logger.debug("%r connected to %s:%r: (%r, %r)", 

1209 sock, host, port, transport, protocol) 

1210 return transport, protocol 

1211 

1212 async def _create_connection_transport( 

1213 self, sock, protocol_factory, ssl, 

1214 server_hostname, server_side=False, 

1215 ssl_handshake_timeout=None, 

1216 ssl_shutdown_timeout=None): 

1217 

1218 sock.setblocking(False) 

1219 

1220 protocol = protocol_factory() 

1221 waiter = self.create_future() 

1222 if ssl: 

1223 sslcontext = None if isinstance(ssl, bool) else ssl 

1224 transport = self._make_ssl_transport( 

1225 sock, protocol, sslcontext, waiter, 

1226 server_side=server_side, server_hostname=server_hostname, 

1227 ssl_handshake_timeout=ssl_handshake_timeout, 

1228 ssl_shutdown_timeout=ssl_shutdown_timeout) 

1229 else: 

1230 transport = self._make_socket_transport(sock, protocol, waiter) 

1231 

1232 try: 

1233 await waiter 

1234 except: 

1235 transport.close() 

1236 raise 

1237 

1238 return transport, protocol 

1239 

1240 async def sendfile(self, transport, file, offset=0, count=None, 

1241 *, fallback=True): 

1242 """Send a file to transport. 

1243 

1244 Return the total number of bytes which were sent. 

1245 

1246 The method uses high-performance os.sendfile if available. 

1247 

1248 file must be a regular file object opened in binary mode. 

1249 

1250 offset tells from where to start reading the file. If specified, 

1251 count is the total number of bytes to transmit as opposed to 

1252 sending the file until EOF is reached. File position is updated on 

1253 return or also in case of error in which case file.tell() 

1254 can be used to figure out the number of bytes 

1255 which were sent. 

1256 

1257 fallback set to True makes asyncio to manually read and send 

1258 the file when the platform does not support the sendfile syscall 

1259 (e.g. Windows or SSL socket on Unix). 

1260 

1261 Raise SendfileNotAvailableError if the system does not support 

1262 sendfile syscall and fallback is False. 

1263 """ 

1264 if transport.is_closing(): 

1265 raise RuntimeError("Transport is closing") 

1266 mode = getattr(transport, '_sendfile_compatible', 

1267 constants._SendfileMode.UNSUPPORTED) 

1268 if mode is constants._SendfileMode.UNSUPPORTED: 

1269 raise RuntimeError( 

1270 f"sendfile is not supported for transport {transport!r}") 

1271 if mode is constants._SendfileMode.TRY_NATIVE: 

1272 try: 

1273 return await self._sendfile_native(transport, file, 

1274 offset, count) 

1275 except exceptions.SendfileNotAvailableError: 

1276 if not fallback: 

1277 raise 

1278 

1279 if not fallback: 

1280 raise RuntimeError( 

1281 f"fallback is disabled and native sendfile is not " 

1282 f"supported for transport {transport!r}") 

1283 

1284 return await self._sendfile_fallback(transport, file, 

1285 offset, count) 

1286 

1287 async def _sendfile_native(self, transp, file, offset, count): 

1288 raise exceptions.SendfileNotAvailableError( 

1289 "sendfile syscall is not supported") 

1290 

1291 async def _sendfile_fallback(self, transp, file, offset, count): 

1292 if offset: 

1293 file.seek(offset) 

1294 blocksize = min(count, 16384) if count else 16384 

1295 buf = bytearray(blocksize) 

1296 total_sent = 0 

1297 proto = _SendfileFallbackProtocol(transp) 

1298 try: 

1299 while True: 

1300 if count: 

1301 blocksize = min(count - total_sent, blocksize) 

1302 if blocksize <= 0: 

1303 return total_sent 

1304 view = memoryview(buf)[:blocksize] 

1305 read = await self.run_in_executor(None, file.readinto, view) 

1306 if not read: 

1307 return total_sent # EOF 

1308 transp.write(view[:read]) 

1309 await proto.drain() 

1310 total_sent += read 

1311 finally: 

1312 if total_sent > 0 and hasattr(file, 'seek'): 

1313 file.seek(offset + total_sent) 

1314 await proto.restore() 

1315 

1316 async def start_tls(self, transport, protocol, sslcontext, *, 

1317 server_side=False, 

1318 server_hostname=None, 

1319 ssl_handshake_timeout=None, 

1320 ssl_shutdown_timeout=None): 

1321 """Upgrade transport to TLS. 

1322 

1323 Return a new transport that *protocol* should start using 

1324 immediately. 

1325 """ 

1326 if ssl is None: 1326 ↛ 1327line 1326 didn't jump to line 1327 because the condition on line 1326 was never true

1327 raise RuntimeError('Python ssl module is not available') 

1328 

1329 if not isinstance(sslcontext, ssl.SSLContext): 

1330 raise TypeError( 

1331 f'sslcontext is expected to be an instance of ssl.SSLContext, ' 

1332 f'got {sslcontext!r}') 

1333 

1334 if not getattr(transport, '_start_tls_compatible', False): 

1335 raise TypeError( 

1336 f'transport {transport!r} is not supported by start_tls()') 

1337 

1338 waiter = self.create_future() 

1339 ssl_protocol = sslproto.SSLProtocol( 

1340 self, protocol, sslcontext, waiter, 

1341 server_side, server_hostname, 

1342 ssl_handshake_timeout=ssl_handshake_timeout, 

1343 ssl_shutdown_timeout=ssl_shutdown_timeout, 

1344 call_connection_made=False) 

1345 

1346 # Pause early so that "ssl_protocol.data_received()" doesn't 

1347 # have a chance to get called before "ssl_protocol.connection_made()". 

1348 transport.pause_reading() 

1349 

1350 # gh-142352: move buffered StreamReader data to SSLProtocol 

1351 if server_side: 

1352 from .streams import StreamReaderProtocol 

1353 if isinstance(protocol, StreamReaderProtocol): 

1354 stream_reader = getattr(protocol, '_stream_reader', None) 

1355 if stream_reader is not None: 1355 ↛ 1361line 1355 didn't jump to line 1361 because the condition on line 1355 was always true

1356 buffer = stream_reader._buffer 

1357 if buffer: 

1358 ssl_protocol._incoming.write(buffer) 

1359 buffer.clear() 

1360 

1361 transport.set_protocol(ssl_protocol) 

1362 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport) 

1363 resume_cb = self.call_soon(transport.resume_reading) 

1364 

1365 try: 

1366 await waiter 

1367 except BaseException: 

1368 transport.close() 

1369 conmade_cb.cancel() 

1370 resume_cb.cancel() 

1371 raise 

1372 

1373 return ssl_protocol._app_transport 

1374 

1375 async def create_datagram_endpoint(self, protocol_factory, 

1376 local_addr=None, remote_addr=None, *, 

1377 family=0, proto=0, flags=0, 

1378 reuse_port=None, 

1379 allow_broadcast=None, sock=None): 

1380 """Create datagram connection.""" 

1381 if sock is not None: 

1382 if sock.type == socket.SOCK_STREAM: 

1383 raise ValueError( 

1384 f'A datagram socket was expected, got {sock!r}') 

1385 if (local_addr or remote_addr or 

1386 family or proto or flags or 

1387 reuse_port or allow_broadcast): 

1388 # show the problematic kwargs in exception msg 

1389 opts = dict(local_addr=local_addr, remote_addr=remote_addr, 

1390 family=family, proto=proto, flags=flags, 

1391 reuse_port=reuse_port, 

1392 allow_broadcast=allow_broadcast) 

1393 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v) 

1394 raise ValueError( 

1395 f'socket modifier keyword arguments can not be used ' 

1396 f'when sock is specified. ({problems})') 

1397 sock.setblocking(False) 

1398 r_addr = None 

1399 else: 

1400 if not (local_addr or remote_addr): 

1401 if family == 0: 

1402 raise ValueError('unexpected address family') 

1403 addr_pairs_info = (((family, proto), (None, None)),) 

1404 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: 

1405 for addr in (local_addr, remote_addr): 

1406 if addr is not None and not isinstance(addr, str): 1406 ↛ 1407line 1406 didn't jump to line 1407 because the condition on line 1406 was never true

1407 raise TypeError('string is expected') 

1408 

1409 if local_addr and local_addr[0] not in (0, '\x00'): 1409 ↛ 1421line 1409 didn't jump to line 1421 because the condition on line 1409 was always true

1410 try: 

1411 if stat.S_ISSOCK(os.stat(local_addr).st_mode): 1411 ↛ 1421line 1411 didn't jump to line 1421 because the condition on line 1411 was always true

1412 os.remove(local_addr) 

1413 except FileNotFoundError: 

1414 pass 

1415 except OSError as err: 

1416 # Directory may have permissions only to create socket. 

1417 logger.error('Unable to check or remove stale UNIX ' 

1418 'socket %r: %r', 

1419 local_addr, err) 

1420 

1421 addr_pairs_info = (((family, proto), 

1422 (local_addr, remote_addr)), ) 

1423 else: 

1424 # join address by (family, protocol) 

1425 addr_infos = {} # Using order preserving dict 

1426 for idx, addr in ((0, local_addr), (1, remote_addr)): 

1427 if addr is not None: 

1428 if not (isinstance(addr, tuple) and len(addr) == 2): 

1429 raise TypeError('2-tuple is expected') 

1430 

1431 infos = await self._ensure_resolved( 

1432 addr, family=family, type=socket.SOCK_DGRAM, 

1433 proto=proto, flags=flags, loop=self) 

1434 if not infos: 

1435 raise OSError('getaddrinfo() returned empty list') 

1436 

1437 for fam, _, pro, _, address in infos: 

1438 key = (fam, pro) 

1439 if key not in addr_infos: 1439 ↛ 1441line 1439 didn't jump to line 1441 because the condition on line 1439 was always true

1440 addr_infos[key] = [None, None] 

1441 addr_infos[key][idx] = address 

1442 

1443 # each addr has to have info for each (family, proto) pair 

1444 addr_pairs_info = [ 

1445 (key, addr_pair) for key, addr_pair in addr_infos.items() 

1446 if not ((local_addr and addr_pair[0] is None) or 

1447 (remote_addr and addr_pair[1] is None))] 

1448 

1449 if not addr_pairs_info: 

1450 raise ValueError('can not get address information') 

1451 

1452 exceptions = [] 

1453 

1454 for ((family, proto), 

1455 (local_address, remote_address)) in addr_pairs_info: 

1456 sock = None 

1457 r_addr = None 

1458 try: 

1459 sock = socket.socket( 

1460 family=family, type=socket.SOCK_DGRAM, proto=proto) 

1461 if reuse_port: 

1462 _set_reuseport(sock) 

1463 if allow_broadcast: 

1464 sock.setsockopt( 

1465 socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 

1466 sock.setblocking(False) 

1467 

1468 if local_addr: 

1469 sock.bind(local_address) 

1470 if remote_addr: 

1471 if not allow_broadcast: 

1472 await self.sock_connect(sock, remote_address) 

1473 r_addr = remote_address 

1474 except OSError as exc: 

1475 if sock is not None: 

1476 sock.close() 

1477 exceptions.append(exc) 

1478 except: 

1479 if sock is not None: 1479 ↛ 1481line 1479 didn't jump to line 1481 because the condition on line 1479 was always true

1480 sock.close() 

1481 raise 

1482 else: 

1483 break 

1484 else: 

1485 raise exceptions[0] 

1486 

1487 protocol = protocol_factory() 

1488 waiter = self.create_future() 

1489 transport = self._make_datagram_transport( 

1490 sock, protocol, r_addr, waiter) 

1491 if self._debug: 1491 ↛ 1492line 1491 didn't jump to line 1492 because the condition on line 1491 was never true

1492 if local_addr: 

1493 logger.info("Datagram endpoint local_addr=%r remote_addr=%r " 

1494 "created: (%r, %r)", 

1495 local_addr, remote_addr, transport, protocol) 

1496 else: 

1497 logger.debug("Datagram endpoint remote_addr=%r created: " 

1498 "(%r, %r)", 

1499 remote_addr, transport, protocol) 

1500 

1501 try: 

1502 await waiter 

1503 except: 

1504 transport.close() 

1505 raise 

1506 

1507 return transport, protocol 

1508 

1509 async def _ensure_resolved(self, address, *, 

1510 family=0, type=socket.SOCK_STREAM, 

1511 proto=0, flags=0, loop): 

1512 host, port = address[:2] 

1513 info = _ipaddr_info(host, port, family, type, proto, *address[2:]) 

1514 if info is not None: 

1515 # "host" is already a resolved IP. 

1516 return [info] 

1517 else: 

1518 return await loop.getaddrinfo(host, port, family=family, type=type, 

1519 proto=proto, flags=flags) 

1520 

1521 async def _create_server_getaddrinfo(self, host, port, family, flags): 

1522 infos = await self._ensure_resolved((host, port), family=family, 

1523 type=socket.SOCK_STREAM, 

1524 flags=flags, loop=self) 

1525 if not infos: 

1526 raise OSError(f'getaddrinfo({host!r}) returned empty list') 

1527 return infos 

1528 

1529 async def create_server( 

1530 self, protocol_factory, host=None, port=None, 

1531 *, 

1532 family=socket.AF_UNSPEC, 

1533 flags=socket.AI_PASSIVE, 

1534 sock=None, 

1535 backlog=100, 

1536 ssl=None, 

1537 reuse_address=None, 

1538 reuse_port=None, 

1539 keep_alive=None, 

1540 ssl_handshake_timeout=None, 

1541 ssl_shutdown_timeout=None, 

1542 start_serving=True): 

1543 """Create a TCP server. 

1544 

1545 The host parameter can be a string, in that case the TCP server is 

1546 bound to host and port. 

1547 

1548 The host parameter can also be a sequence of strings and in that case 

1549 the TCP server is bound to all hosts of the sequence. If a host 

1550 appears multiple times (possibly indirectly e.g. when hostnames 

1551 resolve to the same IP address), the server is only bound once to that 

1552 host. 

1553 

1554 Return a Server object which can be used to stop the service. 

1555 

1556 This method is a coroutine. 

1557 """ 

1558 if isinstance(ssl, bool): 1558 ↛ 1559line 1558 didn't jump to line 1559 because the condition on line 1558 was never true

1559 raise TypeError('ssl argument must be an SSLContext or None') 

1560 

1561 if ssl_handshake_timeout is not None and ssl is None: 

1562 raise ValueError( 

1563 'ssl_handshake_timeout is only meaningful with ssl') 

1564 

1565 if ssl_shutdown_timeout is not None and ssl is None: 1565 ↛ 1566line 1565 didn't jump to line 1566 because the condition on line 1565 was never true

1566 raise ValueError( 

1567 'ssl_shutdown_timeout is only meaningful with ssl') 

1568 

1569 if sock is not None: 

1570 _check_ssl_socket(sock) 

1571 

1572 if host is not None or port is not None: 

1573 if sock is not None: 

1574 raise ValueError( 

1575 'host/port and sock can not be specified at the same time') 

1576 

1577 if reuse_address is None: 1577 ↛ 1579line 1577 didn't jump to line 1579 because the condition on line 1577 was always true

1578 reuse_address = os.name == "posix" and sys.platform != "cygwin" 

1579 sockets = [] 

1580 if host == '': 

1581 hosts = [None] 

1582 elif (isinstance(host, str) or 

1583 not isinstance(host, collections.abc.Iterable)): 

1584 hosts = [host] 

1585 else: 

1586 hosts = host 

1587 

1588 fs = [self._create_server_getaddrinfo(host, port, family=family, 

1589 flags=flags) 

1590 for host in hosts] 

1591 infos = await tasks.gather(*fs) 

1592 infos = set(itertools.chain.from_iterable(infos)) 

1593 

1594 completed = False 

1595 try: 

1596 for res in infos: 

1597 af, socktype, proto, canonname, sa = res 

1598 try: 

1599 sock = socket.socket(af, socktype, proto) 

1600 except socket.error: 

1601 # Assume it's a bad family/type/protocol combination. 

1602 if self._debug: 

1603 logger.warning('create_server() failed to create ' 

1604 'socket.socket(%r, %r, %r)', 

1605 af, socktype, proto, exc_info=True) 

1606 continue 

1607 sockets.append(sock) 

1608 if reuse_address: 1608 ↛ 1613line 1608 didn't jump to line 1613 because the condition on line 1608 was always true

1609 sock.setsockopt( 

1610 socket.SOL_SOCKET, socket.SO_REUSEADDR, True) 

1611 # Since Linux 6.12.9, SO_REUSEPORT is not allowed 

1612 # on other address families than AF_INET/AF_INET6. 

1613 if reuse_port and af in (socket.AF_INET, socket.AF_INET6): 

1614 _set_reuseport(sock) 

1615 if keep_alive: 1615 ↛ 1616line 1615 didn't jump to line 1616 because the condition on line 1615 was never true

1616 sock.setsockopt( 

1617 socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) 

1618 # Disable IPv4/IPv6 dual stack support (enabled by 

1619 # default on Linux) which makes a single socket 

1620 # listen on both address families. 

1621 if (_HAS_IPv6 and 

1622 af == socket.AF_INET6 and 

1623 hasattr(socket, 'IPPROTO_IPV6')): 

1624 sock.setsockopt(socket.IPPROTO_IPV6, 

1625 socket.IPV6_V6ONLY, 

1626 True) 

1627 try: 

1628 sock.bind(sa) 

1629 except OSError as err: 

1630 msg = ('error while attempting ' 

1631 'to bind on address %r: %s' 

1632 % (sa, str(err).lower())) 

1633 if err.errno == errno.EADDRNOTAVAIL: 1633 ↛ 1635line 1633 didn't jump to line 1635 because the condition on line 1633 was never true

1634 # Assume the family is not enabled (bpo-30945) 

1635 sockets.pop() 

1636 sock.close() 

1637 if self._debug: 

1638 logger.warning(msg) 

1639 continue 

1640 raise OSError(err.errno, msg) from None 

1641 

1642 if not sockets: 1642 ↛ 1643line 1642 didn't jump to line 1643 because the condition on line 1642 was never true

1643 raise OSError('could not bind on any address out of %r' 

1644 % ([info[4] for info in infos],)) 

1645 

1646 completed = True 

1647 finally: 

1648 if not completed: 

1649 for sock in sockets: 1649 ↛ 1658line 1649 didn't jump to line 1658 because the loop on line 1649 didn't complete

1650 sock.close() 

1651 else: 

1652 if sock is None: 

1653 raise ValueError('Neither host/port nor sock were specified') 

1654 if sock.type != socket.SOCK_STREAM: 

1655 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 

1656 sockets = [sock] 

1657 

1658 for sock in sockets: 

1659 sock.setblocking(False) 

1660 

1661 server = Server(self, sockets, protocol_factory, 

1662 ssl, backlog, ssl_handshake_timeout, 

1663 ssl_shutdown_timeout) 

1664 if start_serving: 

1665 server._start_serving() 

1666 # Skip one loop iteration so that all 'loop.add_reader' 

1667 # go through. 

1668 await tasks.sleep(0) 

1669 

1670 if self._debug: 

1671 logger.info("%r is serving", server) 

1672 return server 

1673 

1674 async def connect_accepted_socket( 

1675 self, protocol_factory, sock, 

1676 *, ssl=None, 

1677 ssl_handshake_timeout=None, 

1678 ssl_shutdown_timeout=None): 

1679 if sock.type != socket.SOCK_STREAM: 1679 ↛ 1680line 1679 didn't jump to line 1680 because the condition on line 1679 was never true

1680 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 

1681 

1682 if ssl_handshake_timeout is not None and not ssl: 

1683 raise ValueError( 

1684 'ssl_handshake_timeout is only meaningful with ssl') 

1685 

1686 if ssl_shutdown_timeout is not None and not ssl: 1686 ↛ 1687line 1686 didn't jump to line 1687 because the condition on line 1686 was never true

1687 raise ValueError( 

1688 'ssl_shutdown_timeout is only meaningful with ssl') 

1689 

1690 _check_ssl_socket(sock) 

1691 

1692 transport, protocol = await self._create_connection_transport( 

1693 sock, protocol_factory, ssl, '', server_side=True, 

1694 ssl_handshake_timeout=ssl_handshake_timeout, 

1695 ssl_shutdown_timeout=ssl_shutdown_timeout) 

1696 if self._debug: 1696 ↛ 1699line 1696 didn't jump to line 1699 because the condition on line 1696 was never true

1697 # Get the socket from the transport because SSL transport closes 

1698 # the old socket and creates a new SSL socket 

1699 sock = transport.get_extra_info('socket') 

1700 logger.debug("%r handled: (%r, %r)", sock, transport, protocol) 

1701 return transport, protocol 

1702 

1703 async def connect_read_pipe(self, protocol_factory, pipe): 

1704 protocol = protocol_factory() 

1705 waiter = self.create_future() 

1706 transport = self._make_read_pipe_transport(pipe, protocol, waiter) 

1707 

1708 try: 

1709 await waiter 

1710 except: 

1711 transport.close() 

1712 raise 

1713 

1714 if self._debug: 1714 ↛ 1715line 1714 didn't jump to line 1715 because the condition on line 1714 was never true

1715 logger.debug('Read pipe %r connected: (%r, %r)', 

1716 pipe.fileno(), transport, protocol) 

1717 return transport, protocol 

1718 

1719 async def connect_write_pipe(self, protocol_factory, pipe): 

1720 protocol = protocol_factory() 

1721 waiter = self.create_future() 

1722 transport = self._make_write_pipe_transport(pipe, protocol, waiter) 

1723 

1724 try: 

1725 await waiter 

1726 except: 

1727 transport.close() 

1728 raise 

1729 

1730 if self._debug: 1730 ↛ 1731line 1730 didn't jump to line 1731 because the condition on line 1730 was never true

1731 logger.debug('Write pipe %r connected: (%r, %r)', 

1732 pipe.fileno(), transport, protocol) 

1733 return transport, protocol 

1734 

1735 def _log_subprocess(self, msg, stdin, stdout, stderr): 

1736 info = [msg] 

1737 if stdin is not None: 

1738 info.append(f'stdin={_format_pipe(stdin)}') 

1739 if stdout is not None and stderr == subprocess.STDOUT: 

1740 info.append(f'stdout=stderr={_format_pipe(stdout)}') 

1741 else: 

1742 if stdout is not None: 

1743 info.append(f'stdout={_format_pipe(stdout)}') 

1744 if stderr is not None: 

1745 info.append(f'stderr={_format_pipe(stderr)}') 

1746 logger.debug(' '.join(info)) 

1747 

1748 async def subprocess_shell(self, protocol_factory, cmd, *, 

1749 stdin=subprocess.PIPE, 

1750 stdout=subprocess.PIPE, 

1751 stderr=subprocess.PIPE, 

1752 universal_newlines=False, 

1753 shell=True, bufsize=0, 

1754 encoding=None, errors=None, text=None, 

1755 **kwargs): 

1756 if not isinstance(cmd, (bytes, str)): 

1757 raise ValueError("cmd must be a string") 

1758 if universal_newlines: 

1759 raise ValueError("universal_newlines must be False") 

1760 if not shell: 

1761 raise ValueError("shell must be True") 

1762 if bufsize != 0: 

1763 raise ValueError("bufsize must be 0") 

1764 if text: 

1765 raise ValueError("text must be False") 

1766 if encoding is not None: 

1767 raise ValueError("encoding must be None") 

1768 if errors is not None: 

1769 raise ValueError("errors must be None") 

1770 

1771 protocol = protocol_factory() 

1772 debug_log = None 

1773 if self._debug: 1773 ↛ 1776line 1773 didn't jump to line 1776 because the condition on line 1773 was never true

1774 # don't log parameters: they may contain sensitive information 

1775 # (password) and may be too long 

1776 debug_log = 'run shell command %r' % cmd 

1777 self._log_subprocess(debug_log, stdin, stdout, stderr) 

1778 transport = await self._make_subprocess_transport( 

1779 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) 

1780 if self._debug and debug_log is not None: 1780 ↛ 1781line 1780 didn't jump to line 1781 because the condition on line 1780 was never true

1781 logger.info('%s: %r', debug_log, transport) 

1782 return transport, protocol 

1783 

1784 async def subprocess_exec(self, protocol_factory, program, *args, 

1785 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 

1786 stderr=subprocess.PIPE, universal_newlines=False, 

1787 shell=False, bufsize=0, 

1788 encoding=None, errors=None, text=None, 

1789 **kwargs): 

1790 if universal_newlines: 

1791 raise ValueError("universal_newlines must be False") 

1792 if shell: 

1793 raise ValueError("shell must be False") 

1794 if bufsize != 0: 

1795 raise ValueError("bufsize must be 0") 

1796 if text: 

1797 raise ValueError("text must be False") 

1798 if encoding is not None: 

1799 raise ValueError("encoding must be None") 

1800 if errors is not None: 

1801 raise ValueError("errors must be None") 

1802 

1803 popen_args = (program,) + args 

1804 protocol = protocol_factory() 

1805 debug_log = None 

1806 if self._debug: 1806 ↛ 1809line 1806 didn't jump to line 1809 because the condition on line 1806 was never true

1807 # don't log parameters: they may contain sensitive information 

1808 # (password) and may be too long 

1809 debug_log = f'execute program {program!r}' 

1810 self._log_subprocess(debug_log, stdin, stdout, stderr) 

1811 transport = await self._make_subprocess_transport( 

1812 protocol, popen_args, False, stdin, stdout, stderr, 

1813 bufsize, **kwargs) 

1814 if self._debug and debug_log is not None: 1814 ↛ 1815line 1814 didn't jump to line 1815 because the condition on line 1814 was never true

1815 logger.info('%s: %r', debug_log, transport) 

1816 return transport, protocol 

1817 

1818 def get_exception_handler(self): 

1819 """Return an exception handler, or None if the default one is in use. 

1820 """ 

1821 return self._exception_handler 

1822 

1823 def set_exception_handler(self, handler): 

1824 """Set handler as the new event loop exception handler. 

1825 

1826 If handler is None, the default exception handler will 

1827 be set. 

1828 

1829 If handler is a callable object, it should have a 

1830 signature matching '(loop, context)', where 'loop' 

1831 will be a reference to the active event loop, 'context' 

1832 will be a dict object (see `call_exception_handler()` 

1833 documentation for details about context). 

1834 """ 

1835 if handler is not None and not callable(handler): 

1836 raise TypeError(f'A callable object or None is expected, ' 

1837 f'got {handler!r}') 

1838 self._exception_handler = handler 

1839 

1840 def default_exception_handler(self, context): 

1841 """Default exception handler. 

1842 

1843 This is called when an exception occurs and no exception 

1844 handler is set, and can be called by a custom exception 

1845 handler that wants to defer to the default behavior. 

1846 

1847 This default handler logs the error message and other 

1848 context-dependent information. In debug mode, a truncated 

1849 stack trace is also appended showing where the given object 

1850 (e.g. a handle or future or task) was created, if any. 

1851 

1852 The context parameter has the same meaning as in 

1853 `call_exception_handler()`. 

1854 """ 

1855 message = context.get('message') 

1856 if not message: 1856 ↛ 1857line 1856 didn't jump to line 1857 because the condition on line 1856 was never true

1857 message = 'Unhandled exception in event loop' 

1858 

1859 exception = context.get('exception') 

1860 if exception is not None: 

1861 exc_info = (type(exception), exception, exception.__traceback__) 

1862 else: 

1863 exc_info = False 

1864 

1865 if ('source_traceback' not in context and 1865 ↛ 1868line 1865 didn't jump to line 1868 because the condition on line 1865 was never true

1866 self._current_handle is not None and 

1867 self._current_handle._source_traceback): 

1868 context['handle_traceback'] = \ 

1869 self._current_handle._source_traceback 

1870 

1871 log_lines = [message] 

1872 for key in sorted(context): 

1873 if key in {'message', 'exception'}: 

1874 continue 

1875 value = context[key] 

1876 if key == 'source_traceback': 

1877 tb = ''.join(traceback.format_list(value)) 

1878 value = 'Object created at (most recent call last):\n' 

1879 value += tb.rstrip() 

1880 elif key == 'handle_traceback': 1880 ↛ 1881line 1880 didn't jump to line 1881 because the condition on line 1880 was never true

1881 tb = ''.join(traceback.format_list(value)) 

1882 value = 'Handle created at (most recent call last):\n' 

1883 value += tb.rstrip() 

1884 else: 

1885 value = repr(value) 

1886 log_lines.append(f'{key}: {value}') 

1887 

1888 logger.error('\n'.join(log_lines), exc_info=exc_info) 

1889 

1890 def call_exception_handler(self, context): 

1891 """Call the current event loop's exception handler. 

1892 

1893 The context argument is a dict containing the following keys: 

1894 

1895 - 'message': Error message; 

1896 - 'exception' (optional): Exception object; 

1897 - 'future' (optional): Future instance; 

1898 - 'task' (optional): Task instance; 

1899 - 'handle' (optional): Handle instance; 

1900 - 'protocol' (optional): Protocol instance; 

1901 - 'transport' (optional): Transport instance; 

1902 - 'socket' (optional): Socket instance; 

1903 - 'source_traceback' (optional): Traceback of the source; 

1904 - 'handle_traceback' (optional): Traceback of the handle; 

1905 - 'asyncgen' (optional): Asynchronous generator that caused 

1906 the exception. 

1907 

1908 New keys maybe introduced in the future. 

1909 

1910 Note: do not overload this method in an event loop subclass. 

1911 For custom exception handling, use the 

1912 `set_exception_handler()` method. 

1913 """ 

1914 if self._exception_handler is None: 

1915 try: 

1916 self.default_exception_handler(context) 

1917 except (SystemExit, KeyboardInterrupt): 

1918 raise 

1919 except BaseException: 

1920 # Second protection layer for unexpected errors 

1921 # in the default implementation, as well as for subclassed 

1922 # event loops with overloaded "default_exception_handler". 

1923 logger.error('Exception in default exception handler', 

1924 exc_info=True) 

1925 else: 

1926 try: 

1927 ctx = None 

1928 thing = context.get("task") 

1929 if thing is None: 

1930 # Even though Futures don't have a context, 

1931 # Task is a subclass of Future, 

1932 # and sometimes the 'future' key holds a Task. 

1933 thing = context.get("future") 

1934 if thing is None: 

1935 # Handles also have a context. 

1936 thing = context.get("handle") 

1937 if thing is not None and hasattr(thing, "get_context"): 

1938 ctx = thing.get_context() 

1939 if ctx is not None and hasattr(ctx, "run"): 

1940 ctx.run(self._exception_handler, self, context) 

1941 else: 

1942 self._exception_handler(self, context) 

1943 except (SystemExit, KeyboardInterrupt): 

1944 raise 

1945 except BaseException as exc: 

1946 # Exception in the user set custom exception handler. 

1947 try: 

1948 # Let's try default handler. 

1949 self.default_exception_handler({ 

1950 'message': 'Unhandled error in exception handler', 

1951 'exception': exc, 

1952 'context': context, 

1953 }) 

1954 except (SystemExit, KeyboardInterrupt): 

1955 raise 

1956 except BaseException: 

1957 # Guard 'default_exception_handler' in case it is 

1958 # overloaded. 

1959 logger.error('Exception in default exception handler ' 

1960 'while handling an unexpected error ' 

1961 'in custom exception handler', 

1962 exc_info=True) 

1963 

1964 def _add_callback(self, handle): 

1965 """Add a Handle to _ready.""" 

1966 if not handle._cancelled: 

1967 self._ready.append(handle) 

1968 

1969 def _add_callback_signalsafe(self, handle): 

1970 """Like _add_callback() but called from a signal handler.""" 

1971 self._add_callback(handle) 

1972 self._write_to_self() 

1973 

1974 def _timer_handle_cancelled(self, handle): 

1975 """Notification that a TimerHandle has been cancelled.""" 

1976 if handle._scheduled: 

1977 self._timer_cancelled_count += 1 

1978 

1979 def _run_once(self): 

1980 """Run one full iteration of the event loop. 

1981 

1982 This calls all currently ready callbacks, polls for I/O, 

1983 schedules the resulting callbacks, and finally schedules 

1984 'call_later' callbacks. 

1985 """ 

1986 

1987 sched_count = len(self._scheduled) 

1988 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and 

1989 self._timer_cancelled_count / sched_count > 

1990 _MIN_CANCELLED_TIMER_HANDLES_FRACTION): 

1991 # Remove delayed calls that were cancelled if their number 

1992 # is too high 

1993 new_scheduled = [] 

1994 for handle in self._scheduled: 

1995 if handle._cancelled: 

1996 handle._scheduled = False 

1997 else: 

1998 new_scheduled.append(handle) 

1999 

2000 heapq.heapify(new_scheduled) 

2001 self._scheduled = new_scheduled 

2002 self._timer_cancelled_count = 0 

2003 else: 

2004 # Remove delayed calls that were cancelled from head of queue. 

2005 while self._scheduled and self._scheduled[0]._cancelled: 

2006 self._timer_cancelled_count -= 1 

2007 handle = heapq.heappop(self._scheduled) 

2008 handle._scheduled = False 

2009 

2010 timeout = None 

2011 if self._ready or self._stopping: 

2012 timeout = 0 

2013 elif self._scheduled: 

2014 # Compute the desired timeout. 

2015 timeout = self._scheduled[0]._when - self.time() 

2016 if timeout > MAXIMUM_SELECT_TIMEOUT: 2016 ↛ 2017line 2016 didn't jump to line 2017 because the condition on line 2016 was never true

2017 timeout = MAXIMUM_SELECT_TIMEOUT 

2018 elif timeout < 0: 

2019 timeout = 0 

2020 

2021 event_list = self._selector.select(timeout) 

2022 self._process_events(event_list) 

2023 # Needed to break cycles when an exception occurs. 

2024 event_list = None 

2025 

2026 # Handle 'later' callbacks that are ready. 

2027 now = self.time() 

2028 # Ensure that `end_time` is strictly increasing 

2029 # when the clock resolution is too small. 

2030 end_time = now + max(self._clock_resolution, math.ulp(now)) 

2031 while self._scheduled: 

2032 handle = self._scheduled[0] 

2033 if handle._when >= end_time: 

2034 break 

2035 handle = heapq.heappop(self._scheduled) 

2036 handle._scheduled = False 

2037 self._ready.append(handle) 

2038 

2039 # This is the only place where callbacks are actually *called*. 

2040 # All other places just add them to ready. 

2041 # Note: We run all currently scheduled callbacks, but not any 

2042 # callbacks scheduled by callbacks run this time around -- 

2043 # they will be run the next time (after another I/O poll). 

2044 # Use an idiom that is thread-safe without using locks. 

2045 ntodo = len(self._ready) 

2046 for i in range(ntodo): 

2047 handle = self._ready.popleft() 

2048 if handle._cancelled: 

2049 continue 

2050 if self._debug: 

2051 try: 

2052 self._current_handle = handle 

2053 t0 = self.time() 

2054 handle._run() 

2055 dt = self.time() - t0 

2056 if dt >= self.slow_callback_duration: 

2057 logger.warning('Executing %s took %.3f seconds', 

2058 _format_handle(handle), dt) 

2059 finally: 

2060 self._current_handle = None 

2061 else: 

2062 handle._run() 

2063 handle = None # Needed to break cycles when an exception occurs. 

2064 

2065 def _set_coroutine_origin_tracking(self, enabled): 

2066 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled): 

2067 return 

2068 

2069 if enabled: 

2070 self._coroutine_origin_tracking_saved_depth = ( 

2071 sys.get_coroutine_origin_tracking_depth()) 

2072 sys.set_coroutine_origin_tracking_depth( 

2073 constants.DEBUG_STACK_DEPTH) 

2074 else: 

2075 sys.set_coroutine_origin_tracking_depth( 

2076 self._coroutine_origin_tracking_saved_depth) 

2077 

2078 self._coroutine_origin_tracking_enabled = enabled 

2079 

2080 def get_debug(self): 

2081 return self._debug 

2082 

2083 def set_debug(self, enabled): 

2084 self._debug = enabled 

2085 

2086 if self.is_running(): 

2087 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)