Coverage for Lib/asyncio/base_events.py: 88%

1148 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-15 02:02 +0000

1"""Base implementation of event loop. 

2 

3The event loop can be broken up into a multiplexer (the part 

4responsible for notifying us of I/O events) and the event loop proper, 

5which wraps a multiplexer with functionality for scheduling callbacks, 

6immediately or at a given time in the future. 

7 

8Whenever a public API takes a callback, subsequent positional 

9arguments will be passed to the callback if/when it is called. This 

10avoids the proliferation of trivial lambdas implementing closures. 

11Keyword arguments for the callback are not supported; this is a 

12conscious design decision, leaving the door open for keyword arguments 

13to modify the meaning of the API call itself. 

14""" 

15 

16import collections 

17import collections.abc 

18import concurrent.futures 

19import errno 

20import heapq 

21import itertools 

22import os 

23import socket 

24import stat 

25import subprocess 

26import threading 

27import time 

28import traceback 

29import sys 

30import warnings 

31import weakref 

32 

33try: 

34 import ssl 

35except ImportError: # pragma: no cover 

36 ssl = None 

37 

38from . import constants 

39from . import coroutines 

40from . import events 

41from . import exceptions 

42from . import futures 

43from . import protocols 

44from . import sslproto 

45from . import staggered 

46from . import tasks 

47from . import timeouts 

48from . import transports 

49from . import trsock 

50from .log import logger 

51 

52 

53__all__ = 'BaseEventLoop','Server', 

54 

55 

56# Minimum number of _scheduled timer handles before cleanup of 

57# cancelled handles is performed. 

58_MIN_SCHEDULED_TIMER_HANDLES = 100 

59 

60# Minimum fraction of _scheduled timer handles that are cancelled 

61# before cleanup of cancelled handles is performed. 

62_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 

63 

64 

65_HAS_IPv6 = hasattr(socket, 'AF_INET6') 

66 

67# Maximum timeout passed to select to avoid OS limitations 

68MAXIMUM_SELECT_TIMEOUT = 24 * 3600 

69 

70 

71def _format_handle(handle): 

72 cb = handle._callback 

73 if isinstance(getattr(cb, '__self__', None), tasks.Task): 

74 # format the task 

75 return repr(cb.__self__) 

76 else: 

77 return str(handle) 

78 

79 

80def _format_pipe(fd): 

81 if fd == subprocess.PIPE: 

82 return '<pipe>' 

83 elif fd == subprocess.STDOUT: 

84 return '<stdout>' 

85 else: 

86 return repr(fd) 

87 

88 

89def _set_reuseport(sock): 

90 if not hasattr(socket, 'SO_REUSEPORT'): 

91 raise ValueError('reuse_port not supported by socket module') 

92 else: 

93 try: 

94 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 

95 except OSError: 

96 raise ValueError('reuse_port not supported by socket module, ' 

97 'SO_REUSEPORT defined but not implemented.') 

98 

99 

100def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0): 

101 # Try to skip getaddrinfo if "host" is already an IP. Users might have 

102 # handled name resolution in their own code and pass in resolved IPs. 

103 if not hasattr(socket, 'inet_pton'): 

104 return 

105 

106 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \ 

107 host is None: 

108 return None 

109 

110 if type == socket.SOCK_STREAM: 

111 proto = socket.IPPROTO_TCP 

112 elif type == socket.SOCK_DGRAM: 

113 proto = socket.IPPROTO_UDP 

114 else: 

115 return None 

116 

117 if port is None: 

118 port = 0 

119 elif isinstance(port, bytes) and port == b'': 

120 port = 0 

121 elif isinstance(port, str) and port == '': 

122 port = 0 

123 else: 

124 # If port's a service name like "http", don't skip getaddrinfo. 

125 try: 

126 port = int(port) 

127 except (TypeError, ValueError): 

128 return None 

129 

130 if family == socket.AF_UNSPEC: 

131 afs = [socket.AF_INET] 

132 if _HAS_IPv6: 132 ↛ 137line 132 didn't jump to line 137 because the condition on line 132 was always true

133 afs.append(socket.AF_INET6) 

134 else: 

135 afs = [family] 

136 

137 if isinstance(host, bytes): 

138 host = host.decode('idna') 

139 if '%' in host: 

140 # Linux's inet_pton doesn't accept an IPv6 zone index after host, 

141 # like '::1%lo0'. 

142 return None 

143 

144 for af in afs: 

145 try: 

146 socket.inet_pton(af, host) 

147 # The host has already been resolved. 

148 if _HAS_IPv6 and af == socket.AF_INET6: 

149 return af, type, proto, '', (host, port, flowinfo, scopeid) 

150 else: 

151 return af, type, proto, '', (host, port) 

152 except OSError: 

153 pass 

154 

155 # "host" is not an IP address. 

156 return None 

157 

158 

159def _interleave_addrinfos(addrinfos, first_address_family_count=1): 

160 """Interleave list of addrinfo tuples by family.""" 

161 # Group addresses by family 

162 addrinfos_by_family = collections.OrderedDict() 

163 for addr in addrinfos: 

164 family = addr[0] 

165 if family not in addrinfos_by_family: 165 ↛ 167line 165 didn't jump to line 167 because the condition on line 165 was always true

166 addrinfos_by_family[family] = [] 

167 addrinfos_by_family[family].append(addr) 

168 addrinfos_lists = list(addrinfos_by_family.values()) 

169 

170 reordered = [] 

171 if first_address_family_count > 1: 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true

172 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1]) 

173 del addrinfos_lists[0][:first_address_family_count - 1] 

174 reordered.extend( 

175 a for a in itertools.chain.from_iterable( 

176 itertools.zip_longest(*addrinfos_lists) 

177 ) if a is not None) 

178 return reordered 

179 

180 

181def _run_until_complete_cb(fut): 

182 if not fut.cancelled(): 

183 exc = fut.exception() 

184 if isinstance(exc, (SystemExit, KeyboardInterrupt)): 

185 # Issue #22429: run_forever() already finished, no need to 

186 # stop it. 

187 return 

188 futures._get_loop(fut).stop() 

189 

190 

191if hasattr(socket, 'TCP_NODELAY'): 191 ↛ 198line 191 didn't jump to line 198 because the condition on line 191 was always true

192 def _set_nodelay(sock): 

193 if (sock.family in {socket.AF_INET, socket.AF_INET6} and 

194 sock.type == socket.SOCK_STREAM and 

195 sock.proto == socket.IPPROTO_TCP): 

196 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 

197else: 

198 def _set_nodelay(sock): 

199 pass 

200 

201 

202def _check_ssl_socket(sock): 

203 if ssl is not None and isinstance(sock, ssl.SSLSocket): 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true

204 raise TypeError("Socket cannot be of type SSLSocket") 

205 

206 

207class _SendfileFallbackProtocol(protocols.Protocol): 

208 def __init__(self, transp): 

209 if not isinstance(transp, transports._FlowControlMixin): 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true

210 raise TypeError("transport should be _FlowControlMixin instance") 

211 self._transport = transp 

212 self._proto = transp.get_protocol() 

213 self._should_resume_reading = transp.is_reading() 

214 self._should_resume_writing = transp._protocol_paused 

215 transp.pause_reading() 

216 transp.set_protocol(self) 

217 if self._should_resume_writing: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 self._write_ready_fut = self._transport._loop.create_future() 

219 else: 

220 self._write_ready_fut = None 

221 

222 async def drain(self): 

223 if self._transport.is_closing(): 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true

224 raise ConnectionError("Connection closed by peer") 

225 fut = self._write_ready_fut 

226 if fut is None: 

227 return 

228 await fut 

229 

230 def connection_made(self, transport): 

231 raise RuntimeError("Invalid state: " 

232 "connection should have been established already.") 

233 

234 def connection_lost(self, exc): 

235 if self._write_ready_fut is not None: 235 ↛ 243line 235 didn't jump to line 243 because the condition on line 235 was always true

236 # Never happens if peer disconnects after sending the whole content 

237 # Thus disconnection is always an exception from user perspective 

238 if exc is None: 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true

239 self._write_ready_fut.set_exception( 

240 ConnectionError("Connection is closed by peer")) 

241 else: 

242 self._write_ready_fut.set_exception(exc) 

243 self._proto.connection_lost(exc) 

244 

245 def pause_writing(self): 

246 if self._write_ready_fut is not None: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true

247 return 

248 self._write_ready_fut = self._transport._loop.create_future() 

249 

250 def resume_writing(self): 

251 if self._write_ready_fut is None: 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true

252 return 

253 self._write_ready_fut.set_result(False) 

254 self._write_ready_fut = None 

255 

256 def data_received(self, data): 

257 raise RuntimeError("Invalid state: reading should be paused") 

258 

259 def eof_received(self): 

260 raise RuntimeError("Invalid state: reading should be paused") 

261 

262 async def restore(self): 

263 self._transport.set_protocol(self._proto) 

264 if self._should_resume_reading: 264 ↛ 266line 264 didn't jump to line 266 because the condition on line 264 was always true

265 self._transport.resume_reading() 

266 if self._write_ready_fut is not None: 

267 # Cancel the future. 

268 # Basically it has no effect because protocol is switched back, 

269 # no code should wait for it anymore. 

270 self._write_ready_fut.cancel() 

271 if self._should_resume_writing: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true

272 self._proto.resume_writing() 

273 

274 

275class Server(events.AbstractServer): 

276 

277 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, 

278 ssl_handshake_timeout, ssl_shutdown_timeout=None): 

279 self._loop = loop 

280 self._sockets = sockets 

281 # Weak references so we don't break Transport's ability to 

282 # detect abandoned transports 

283 self._clients = weakref.WeakSet() 

284 self._waiters = [] 

285 self._protocol_factory = protocol_factory 

286 self._backlog = backlog 

287 self._ssl_context = ssl_context 

288 self._ssl_handshake_timeout = ssl_handshake_timeout 

289 self._ssl_shutdown_timeout = ssl_shutdown_timeout 

290 self._serving = False 

291 self._serving_forever_fut = None 

292 

293 def __repr__(self): 

294 return f'<{self.__class__.__name__} sockets={self.sockets!r}>' 

295 

296 def _attach(self, transport): 

297 assert self._sockets is not None 

298 self._clients.add(transport) 

299 

300 def _detach(self, transport): 

301 self._clients.discard(transport) 

302 if len(self._clients) == 0 and self._sockets is None: 

303 self._wakeup() 

304 

305 def _wakeup(self): 

306 waiters = self._waiters 

307 self._waiters = None 

308 for waiter in waiters: 

309 if not waiter.done(): 309 ↛ 308line 309 didn't jump to line 308 because the condition on line 309 was always true

310 waiter.set_result(None) 

311 

312 def _start_serving(self): 

313 if self._serving: 

314 return 

315 self._serving = True 

316 for sock in self._sockets: 

317 sock.listen(self._backlog) 

318 self._loop._start_serving( 

319 self._protocol_factory, sock, self._ssl_context, 

320 self, self._backlog, self._ssl_handshake_timeout, 

321 self._ssl_shutdown_timeout) 

322 

323 def get_loop(self): 

324 return self._loop 

325 

326 def is_serving(self): 

327 return self._serving 

328 

329 @property 

330 def sockets(self): 

331 if self._sockets is None: 

332 return () 

333 return tuple(trsock.TransportSocket(s) for s in self._sockets) 

334 

335 def close(self): 

336 sockets = self._sockets 

337 if sockets is None: 

338 return 

339 self._sockets = None 

340 

341 for sock in sockets: 

342 self._loop._stop_serving(sock) 

343 

344 self._serving = False 

345 

346 if (self._serving_forever_fut is not None and 346 ↛ 348line 346 didn't jump to line 348 because the condition on line 346 was never true

347 not self._serving_forever_fut.done()): 

348 self._serving_forever_fut.cancel() 

349 self._serving_forever_fut = None 

350 

351 if len(self._clients) == 0: 

352 self._wakeup() 

353 

354 def close_clients(self): 

355 for transport in self._clients.copy(): 

356 transport.close() 

357 

358 def abort_clients(self): 

359 for transport in self._clients.copy(): 

360 transport.abort() 

361 

362 async def start_serving(self): 

363 self._start_serving() 

364 # Skip one loop iteration so that all 'loop.add_reader' 

365 # go through. 

366 await tasks.sleep(0) 

367 

368 async def serve_forever(self): 

369 if self._serving_forever_fut is not None: 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true

370 raise RuntimeError( 

371 f'server {self!r} is already being awaited on serve_forever()') 

372 if self._sockets is None: 

373 raise RuntimeError(f'server {self!r} is closed') 

374 

375 self._start_serving() 

376 self._serving_forever_fut = self._loop.create_future() 

377 

378 try: 

379 await self._serving_forever_fut 

380 except exceptions.CancelledError: 

381 try: 

382 self.close() 

383 await self.wait_closed() 

384 finally: 

385 raise 

386 finally: 

387 self._serving_forever_fut = None 

388 

389 async def wait_closed(self): 

390 """Wait until server is closed and all connections are dropped. 

391 

392 - If the server is not closed, wait. 

393 - If it is closed, but there are still active connections, wait. 

394 

395 Anyone waiting here will be unblocked once both conditions 

396 (server is closed and all connections have been dropped) 

397 have become true, in either order. 

398 

399 Historical note: In 3.11 and before, this was broken, returning 

400 immediately if the server was already closed, even if there 

401 were still active connections. An attempted fix in 3.12.0 was 

402 still broken, returning immediately if the server was still 

403 open and there were no active connections. Hopefully in 3.12.1 

404 we have it right. 

405 """ 

406 # Waiters are unblocked by self._wakeup(), which is called 

407 # from two places: self.close() and self._detach(), but only 

408 # when both conditions have become true. To signal that this 

409 # has happened, self._wakeup() sets self._waiters to None. 

410 if self._waiters is None: 

411 return 

412 waiter = self._loop.create_future() 

413 self._waiters.append(waiter) 

414 await waiter 

415 

416 

417class BaseEventLoop(events.AbstractEventLoop): 

418 

419 def __init__(self): 

420 self._timer_cancelled_count = 0 

421 self._closed = False 

422 self._stopping = False 

423 self._ready = collections.deque() 

424 self._scheduled = [] 

425 self._default_executor = None 

426 self._internal_fds = 0 

427 # Identifier of the thread running the event loop, or None if the 

428 # event loop is not running 

429 self._thread_id = None 

430 self._clock_resolution = time.get_clock_info('monotonic').resolution 

431 self._exception_handler = None 

432 self.set_debug(coroutines._is_debug_mode()) 

433 # The preserved state of async generator hooks. 

434 self._old_agen_hooks = None 

435 # In debug mode, if the execution of a callback or a step of a task 

436 # exceed this duration in seconds, the slow callback/task is logged. 

437 self.slow_callback_duration = 0.1 

438 self._current_handle = None 

439 self._task_factory = None 

440 self._coroutine_origin_tracking_enabled = False 

441 self._coroutine_origin_tracking_saved_depth = None 

442 

443 # A weak set of all asynchronous generators that are 

444 # being iterated by the loop. 

445 self._asyncgens = weakref.WeakSet() 

446 # Set to True when `loop.shutdown_asyncgens` is called. 

447 self._asyncgens_shutdown_called = False 

448 # Set to True when `loop.shutdown_default_executor` is called. 

449 self._executor_shutdown_called = False 

450 

451 def __repr__(self): 

452 return ( 

453 f'<{self.__class__.__name__} running={self.is_running()} ' 

454 f'closed={self.is_closed()} debug={self.get_debug()}>' 

455 ) 

456 

457 def create_future(self): 

458 """Create a Future object attached to the loop.""" 

459 return futures.Future(loop=self) 

460 

461 def create_task(self, coro, **kwargs): 

462 """Schedule a coroutine object. 

463 

464 Return a task object. 

465 """ 

466 self._check_closed() 

467 if self._task_factory is not None: 

468 return self._task_factory(self, coro, **kwargs) 

469 

470 task = tasks.Task(coro, loop=self, **kwargs) 

471 if task._source_traceback: 

472 del task._source_traceback[-1] 

473 try: 

474 return task 

475 finally: 

476 # gh-128552: prevent a refcycle of 

477 # task.exception().__traceback__->BaseEventLoop.create_task->task 

478 del task 

479 

480 def set_task_factory(self, factory): 

481 """Set a task factory that will be used by loop.create_task(). 

482 

483 If factory is None the default task factory will be set. 

484 

485 If factory is a callable, it should have a signature matching 

486 '(loop, coro, **kwargs)', where 'loop' will be a reference to the active 

487 event loop, 'coro' will be a coroutine object, and **kwargs will be 

488 arbitrary keyword arguments that should be passed on to Task. 

489 The callable must return a Task. 

490 """ 

491 if factory is not None and not callable(factory): 

492 raise TypeError('task factory must be a callable or None') 

493 self._task_factory = factory 

494 

495 def get_task_factory(self): 

496 """Return a task factory, or None if the default one is in use.""" 

497 return self._task_factory 

498 

499 def _make_socket_transport(self, sock, protocol, waiter=None, *, 

500 extra=None, server=None): 

501 """Create socket transport.""" 

502 raise NotImplementedError 

503 

504 def _make_ssl_transport( 

505 self, rawsock, protocol, sslcontext, waiter=None, 

506 *, server_side=False, server_hostname=None, 

507 extra=None, server=None, 

508 ssl_handshake_timeout=None, 

509 ssl_shutdown_timeout=None, 

510 call_connection_made=True): 

511 """Create SSL transport.""" 

512 raise NotImplementedError 

513 

514 def _make_datagram_transport(self, sock, protocol, 

515 address=None, waiter=None, extra=None): 

516 """Create datagram transport.""" 

517 raise NotImplementedError 

518 

519 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 

520 extra=None): 

521 """Create read pipe transport.""" 

522 raise NotImplementedError 

523 

524 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 

525 extra=None): 

526 """Create write pipe transport.""" 

527 raise NotImplementedError 

528 

529 async def _make_subprocess_transport(self, protocol, args, shell, 

530 stdin, stdout, stderr, bufsize, 

531 extra=None, **kwargs): 

532 """Create subprocess transport.""" 

533 raise NotImplementedError 

534 

535 def _write_to_self(self): 

536 """Write a byte to self-pipe, to wake up the event loop. 

537 

538 This may be called from a different thread. 

539 

540 The subclass is responsible for implementing the self-pipe. 

541 """ 

542 raise NotImplementedError 

543 

544 def _process_events(self, event_list): 

545 """Process selector events.""" 

546 raise NotImplementedError 

547 

548 def _check_closed(self): 

549 if self._closed: 

550 raise RuntimeError('Event loop is closed') 

551 

552 def _check_default_executor(self): 

553 if self._executor_shutdown_called: 553 ↛ 554line 553 didn't jump to line 554 because the condition on line 553 was never true

554 raise RuntimeError('Executor shutdown has been called') 

555 

556 def _asyncgen_finalizer_hook(self, agen): 

557 self._asyncgens.discard(agen) 

558 if not self.is_closed(): 558 ↛ exitline 558 didn't return from function '_asyncgen_finalizer_hook' because the condition on line 558 was always true

559 self.call_soon_threadsafe(self.create_task, agen.aclose()) 

560 

561 def _asyncgen_firstiter_hook(self, agen): 

562 if self._asyncgens_shutdown_called: 562 ↛ 563line 562 didn't jump to line 563 because the condition on line 562 was never true

563 warnings.warn( 

564 f"asynchronous generator {agen!r} was scheduled after " 

565 f"loop.shutdown_asyncgens() call", 

566 ResourceWarning, source=self) 

567 

568 self._asyncgens.add(agen) 

569 

570 async def shutdown_asyncgens(self): 

571 """Shutdown all active asynchronous generators.""" 

572 self._asyncgens_shutdown_called = True 

573 

574 if not len(self._asyncgens): 

575 # If Python version is <3.6 or we don't have any asynchronous 

576 # generators alive. 

577 return 

578 

579 closing_agens = list(self._asyncgens) 

580 self._asyncgens.clear() 

581 

582 results = await tasks.gather( 

583 *[ag.aclose() for ag in closing_agens], 

584 return_exceptions=True) 

585 

586 for result, agen in zip(results, closing_agens): 

587 if isinstance(result, Exception): 587 ↛ 588line 587 didn't jump to line 588 because the condition on line 587 was never true

588 self.call_exception_handler({ 

589 'message': f'an error occurred during closing of ' 

590 f'asynchronous generator {agen!r}', 

591 'exception': result, 

592 'asyncgen': agen 

593 }) 

594 

595 async def shutdown_default_executor(self, timeout=None): 

596 """Schedule the shutdown of the default executor. 

597 

598 The timeout parameter specifies the amount of time the executor will 

599 be given to finish joining. The default value is None, which means 

600 that the executor will be given an unlimited amount of time. 

601 """ 

602 self._executor_shutdown_called = True 

603 if self._default_executor is None: 

604 return 

605 future = self.create_future() 

606 thread = threading.Thread(target=self._do_shutdown, args=(future,)) 

607 thread.start() 

608 try: 

609 async with timeouts.timeout(timeout): 

610 await future 

611 except TimeoutError: 

612 warnings.warn("The executor did not finishing joining " 

613 f"its threads within {timeout} seconds.", 

614 RuntimeWarning, stacklevel=2) 

615 self._default_executor.shutdown(wait=False) 

616 else: 

617 thread.join() 

618 

619 def _do_shutdown(self, future): 

620 try: 

621 self._default_executor.shutdown(wait=True) 

622 if not self.is_closed(): 622 ↛ exitline 622 didn't return from function '_do_shutdown' because the condition on line 622 was always true

623 self.call_soon_threadsafe(futures._set_result_unless_cancelled, 

624 future, None) 

625 except Exception as ex: 

626 if not self.is_closed() and not future.cancelled(): 

627 self.call_soon_threadsafe(future.set_exception, ex) 

628 

629 def _check_running(self): 

630 if self.is_running(): 

631 raise RuntimeError('This event loop is already running') 

632 if events._get_running_loop() is not None: 

633 raise RuntimeError( 

634 'Cannot run the event loop while another loop is running') 

635 

636 def _run_forever_setup(self): 

637 """Prepare the run loop to process events. 

638 

639 This method exists so that custom custom event loop subclasses (e.g., event loops 

640 that integrate a GUI event loop with Python's event loop) have access to all the 

641 loop setup logic. 

642 """ 

643 self._check_closed() 

644 self._check_running() 

645 self._set_coroutine_origin_tracking(self._debug) 

646 

647 self._old_agen_hooks = sys.get_asyncgen_hooks() 

648 self._thread_id = threading.get_ident() 

649 sys.set_asyncgen_hooks( 

650 firstiter=self._asyncgen_firstiter_hook, 

651 finalizer=self._asyncgen_finalizer_hook 

652 ) 

653 

654 events._set_running_loop(self) 

655 

656 def _run_forever_cleanup(self): 

657 """Clean up after an event loop finishes the looping over events. 

658 

659 This method exists so that custom custom event loop subclasses (e.g., event loops 

660 that integrate a GUI event loop with Python's event loop) have access to all the 

661 loop cleanup logic. 

662 """ 

663 self._stopping = False 

664 self._thread_id = None 

665 events._set_running_loop(None) 

666 self._set_coroutine_origin_tracking(False) 

667 # Restore any pre-existing async generator hooks. 

668 if self._old_agen_hooks is not None: 668 ↛ exitline 668 didn't return from function '_run_forever_cleanup' because the condition on line 668 was always true

669 sys.set_asyncgen_hooks(*self._old_agen_hooks) 

670 self._old_agen_hooks = None 

671 

672 def run_forever(self): 

673 """Run until stop() is called.""" 

674 self._run_forever_setup() 

675 try: 

676 while True: 

677 self._run_once() 

678 if self._stopping: 

679 break 

680 finally: 

681 self._run_forever_cleanup() 

682 

683 def run_until_complete(self, future): 

684 """Run until the Future is done. 

685 

686 If the argument is a coroutine, it is wrapped in a Task. 

687 

688 WARNING: It would be disastrous to call run_until_complete() 

689 with the same coroutine twice -- it would wrap it in two 

690 different Tasks and that can't be good. 

691 

692 Return the Future's result, or raise its exception. 

693 """ 

694 self._check_closed() 

695 self._check_running() 

696 

697 new_task = not futures.isfuture(future) 

698 future = tasks.ensure_future(future, loop=self) 

699 if new_task: 

700 # An exception is raised if the future didn't complete, so there 

701 # is no need to log the "destroy pending task" message 

702 future._log_destroy_pending = False 

703 

704 future.add_done_callback(_run_until_complete_cb) 

705 try: 

706 self.run_forever() 

707 except: 

708 if new_task and future.done() and not future.cancelled(): 

709 # The coroutine raised a BaseException. Consume the exception 

710 # to not log a warning, the caller doesn't have access to the 

711 # local task. 

712 future.exception() 

713 raise 

714 finally: 

715 future.remove_done_callback(_run_until_complete_cb) 

716 if not future.done(): 

717 raise RuntimeError('Event loop stopped before Future completed.') 

718 

719 return future.result() 

720 

721 def stop(self): 

722 """Stop running the event loop. 

723 

724 Every callback already scheduled will still run. This simply informs 

725 run_forever to stop looping after a complete iteration. 

726 """ 

727 self._stopping = True 

728 

729 def close(self): 

730 """Close the event loop. 

731 

732 This clears the queues and shuts down the executor, 

733 but does not wait for the executor to finish. 

734 

735 The event loop must not be running. 

736 """ 

737 if self.is_running(): 737 ↛ 738line 737 didn't jump to line 738 because the condition on line 737 was never true

738 raise RuntimeError("Cannot close a running event loop") 

739 if self._closed: 

740 return 

741 if self._debug: 

742 logger.debug("Close %r", self) 

743 self._closed = True 

744 self._ready.clear() 

745 self._scheduled.clear() 

746 self._executor_shutdown_called = True 

747 executor = self._default_executor 

748 if executor is not None: 

749 self._default_executor = None 

750 executor.shutdown(wait=False) 

751 

752 def is_closed(self): 

753 """Returns True if the event loop was closed.""" 

754 return self._closed 

755 

756 def __del__(self, _warn=warnings.warn): 

757 if not self.is_closed(): 757 ↛ 758line 757 didn't jump to line 758 because the condition on line 757 was never true

758 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self) 

759 if not self.is_running(): 

760 self.close() 

761 

762 def is_running(self): 

763 """Returns True if the event loop is running.""" 

764 return (self._thread_id is not None) 

765 

766 def time(self): 

767 """Return the time according to the event loop's clock. 

768 

769 This is a float expressed in seconds since an epoch, but the 

770 epoch, precision, accuracy and drift are unspecified and may 

771 differ per event loop. 

772 """ 

773 return time.monotonic() 

774 

775 def call_later(self, delay, callback, *args, context=None): 

776 """Arrange for a callback to be called at a given time. 

777 

778 Return a Handle: an opaque object with a cancel() method that 

779 can be used to cancel the call. 

780 

781 The delay can be an int or float, expressed in seconds. It is 

782 always relative to the current time. 

783 

784 Each callback will be called exactly once. If two callbacks 

785 are scheduled for exactly the same time, it is undefined which 

786 will be called first. 

787 

788 Any positional arguments after the callback will be passed to 

789 the callback when it is called. 

790 """ 

791 if delay is None: 

792 raise TypeError('delay must not be None') 

793 timer = self.call_at(self.time() + delay, callback, *args, 

794 context=context) 

795 if timer._source_traceback: 

796 del timer._source_traceback[-1] 

797 return timer 

798 

799 def call_at(self, when, callback, *args, context=None): 

800 """Like call_later(), but uses an absolute time. 

801 

802 Absolute time corresponds to the event loop's time() method. 

803 """ 

804 if when is None: 

805 raise TypeError("when cannot be None") 

806 self._check_closed() 

807 if self._debug: 

808 self._check_thread() 

809 self._check_callback(callback, 'call_at') 

810 timer = events.TimerHandle(when, callback, args, self, context) 

811 if timer._source_traceback: 

812 del timer._source_traceback[-1] 

813 heapq.heappush(self._scheduled, timer) 

814 timer._scheduled = True 

815 return timer 

816 

817 def call_soon(self, callback, *args, context=None): 

818 """Arrange for a callback to be called as soon as possible. 

819 

820 This operates as a FIFO queue: callbacks are called in the 

821 order in which they are registered. Each callback will be 

822 called exactly once. 

823 

824 Any positional arguments after the callback will be passed to 

825 the callback when it is called. 

826 """ 

827 self._check_closed() 

828 if self._debug: 

829 self._check_thread() 

830 self._check_callback(callback, 'call_soon') 

831 handle = self._call_soon(callback, args, context) 

832 if handle._source_traceback: 

833 del handle._source_traceback[-1] 

834 return handle 

835 

836 def _check_callback(self, callback, method): 

837 if (coroutines.iscoroutine(callback) or 

838 coroutines._iscoroutinefunction(callback)): 

839 raise TypeError( 

840 f"coroutines cannot be used with {method}()") 

841 if not callable(callback): 

842 raise TypeError( 

843 f'a callable object was expected by {method}(), ' 

844 f'got {callback!r}') 

845 

846 def _call_soon(self, callback, args, context): 

847 handle = events.Handle(callback, args, self, context) 

848 if handle._source_traceback: 

849 del handle._source_traceback[-1] 

850 self._ready.append(handle) 

851 return handle 

852 

853 def _check_thread(self): 

854 """Check that the current thread is the thread running the event loop. 

855 

856 Non-thread-safe methods of this class make this assumption and will 

857 likely behave incorrectly when the assumption is violated. 

858 

859 Should only be called when (self._debug == True). The caller is 

860 responsible for checking this condition for performance reasons. 

861 """ 

862 if self._thread_id is None: 

863 return 

864 thread_id = threading.get_ident() 

865 if thread_id != self._thread_id: 

866 raise RuntimeError( 

867 "Non-thread-safe operation invoked on an event loop other " 

868 "than the current one") 

869 

870 def call_soon_threadsafe(self, callback, *args, context=None): 

871 """Like call_soon(), but thread-safe.""" 

872 self._check_closed() 

873 if self._debug: 

874 self._check_callback(callback, 'call_soon_threadsafe') 

875 handle = events._ThreadSafeHandle(callback, args, self, context) 

876 self._ready.append(handle) 

877 if handle._source_traceback: 

878 del handle._source_traceback[-1] 

879 if handle._source_traceback: 

880 del handle._source_traceback[-1] 

881 self._write_to_self() 

882 return handle 

883 

884 def run_in_executor(self, executor, func, *args): 

885 self._check_closed() 

886 if self._debug: 

887 self._check_callback(func, 'run_in_executor') 

888 if executor is None: 

889 executor = self._default_executor 

890 # Only check when the default executor is being used 

891 self._check_default_executor() 

892 if executor is None: 

893 executor = concurrent.futures.ThreadPoolExecutor( 

894 thread_name_prefix='asyncio' 

895 ) 

896 self._default_executor = executor 

897 return futures.wrap_future( 

898 executor.submit(func, *args), loop=self) 

899 

900 def set_default_executor(self, executor): 

901 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): 

902 raise TypeError('executor must be ThreadPoolExecutor instance') 

903 self._default_executor = executor 

904 

905 def _getaddrinfo_debug(self, host, port, family, type, proto, flags): 

906 msg = [f"{host}:{port!r}"] 

907 if family: 

908 msg.append(f'family={family!r}') 

909 if type: 

910 msg.append(f'type={type!r}') 

911 if proto: 

912 msg.append(f'proto={proto!r}') 

913 if flags: 

914 msg.append(f'flags={flags!r}') 

915 msg = ', '.join(msg) 

916 logger.debug('Get address info %s', msg) 

917 

918 t0 = self.time() 

919 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags) 

920 dt = self.time() - t0 

921 

922 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}' 

923 if dt >= self.slow_callback_duration: 

924 logger.info(msg) 

925 else: 

926 logger.debug(msg) 

927 return addrinfo 

928 

929 async def getaddrinfo(self, host, port, *, 

930 family=0, type=0, proto=0, flags=0): 

931 if self._debug: 931 ↛ 932line 931 didn't jump to line 932 because the condition on line 931 was never true

932 getaddr_func = self._getaddrinfo_debug 

933 else: 

934 getaddr_func = socket.getaddrinfo 

935 

936 return await self.run_in_executor( 

937 None, getaddr_func, host, port, family, type, proto, flags) 

938 

939 async def getnameinfo(self, sockaddr, flags=0): 

940 return await self.run_in_executor( 

941 None, socket.getnameinfo, sockaddr, flags) 

942 

943 async def sock_sendfile(self, sock, file, offset=0, count=None, 

944 *, fallback=True): 

945 if self._debug and sock.gettimeout() != 0: 

946 raise ValueError("the socket must be non-blocking") 

947 _check_ssl_socket(sock) 

948 self._check_sendfile_params(sock, file, offset, count) 

949 try: 

950 return await self._sock_sendfile_native(sock, file, 

951 offset, count) 

952 except exceptions.SendfileNotAvailableError as exc: 

953 if not fallback: 

954 raise 

955 return await self._sock_sendfile_fallback(sock, file, 

956 offset, count) 

957 

958 async def _sock_sendfile_native(self, sock, file, offset, count): 

959 # NB: sendfile syscall is not supported for SSL sockets and 

960 # non-mmap files even if sendfile is supported by OS 

961 raise exceptions.SendfileNotAvailableError( 

962 f"syscall sendfile is not available for socket {sock!r} " 

963 f"and file {file!r} combination") 

964 

965 async def _sock_sendfile_fallback(self, sock, file, offset, count): 

966 if offset: 

967 file.seek(offset) 

968 blocksize = ( 

969 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE) 

970 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE 

971 ) 

972 buf = bytearray(blocksize) 

973 total_sent = 0 

974 try: 

975 while True: 

976 if count: 

977 blocksize = min(count - total_sent, blocksize) 

978 if blocksize <= 0: 

979 break 

980 view = memoryview(buf)[:blocksize] 

981 read = await self.run_in_executor(None, file.readinto, view) 

982 if not read: 

983 break # EOF 

984 await self.sock_sendall(sock, view[:read]) 

985 total_sent += read 

986 return total_sent 

987 finally: 

988 if total_sent > 0 and hasattr(file, 'seek'): 

989 file.seek(offset + total_sent) 

990 

991 def _check_sendfile_params(self, sock, file, offset, count): 

992 if 'b' not in getattr(file, 'mode', 'b'): 

993 raise ValueError("file should be opened in binary mode") 

994 if not sock.type == socket.SOCK_STREAM: 

995 raise ValueError("only SOCK_STREAM type sockets are supported") 

996 if count is not None: 

997 if not isinstance(count, int): 

998 raise TypeError( 

999 "count must be a positive integer (got {!r})".format(count)) 

1000 if count <= 0: 

1001 raise ValueError( 

1002 "count must be a positive integer (got {!r})".format(count)) 

1003 if not isinstance(offset, int): 

1004 raise TypeError( 

1005 "offset must be a non-negative integer (got {!r})".format( 

1006 offset)) 

1007 if offset < 0: 

1008 raise ValueError( 

1009 "offset must be a non-negative integer (got {!r})".format( 

1010 offset)) 

1011 

1012 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None): 

1013 """Create, bind and connect one socket.""" 

1014 my_exceptions = [] 

1015 exceptions.append(my_exceptions) 

1016 family, type_, proto, _, address = addr_info 

1017 sock = None 

1018 try: 

1019 sock = socket.socket(family=family, type=type_, proto=proto) 

1020 sock.setblocking(False) 

1021 if local_addr_infos is not None: 

1022 for lfamily, _, _, _, laddr in local_addr_infos: 

1023 # skip local addresses of different family 

1024 if lfamily != family: 

1025 continue 

1026 try: 

1027 sock.bind(laddr) 

1028 break 

1029 except OSError as exc: 

1030 msg = ( 

1031 f'error while attempting to bind on ' 

1032 f'address {laddr!r}: {str(exc).lower()}' 

1033 ) 

1034 exc = OSError(exc.errno, msg) 

1035 my_exceptions.append(exc) 

1036 else: # all bind attempts failed 

1037 if my_exceptions: 

1038 raise my_exceptions.pop() 

1039 else: 

1040 raise OSError(f"no matching local address with {family=} found") 

1041 await self.sock_connect(sock, address) 

1042 return sock 

1043 except OSError as exc: 

1044 my_exceptions.append(exc) 

1045 if sock is not None: 

1046 sock.close() 

1047 raise 

1048 except: 

1049 if sock is not None: 

1050 sock.close() 

1051 raise 

1052 finally: 

1053 exceptions = my_exceptions = None 

1054 

1055 async def create_connection( 

1056 self, protocol_factory, host=None, port=None, 

1057 *, ssl=None, family=0, 

1058 proto=0, flags=0, sock=None, 

1059 local_addr=None, server_hostname=None, 

1060 ssl_handshake_timeout=None, 

1061 ssl_shutdown_timeout=None, 

1062 happy_eyeballs_delay=None, interleave=None, 

1063 all_errors=False): 

1064 """Connect to a TCP server. 

1065 

1066 Create a streaming transport connection to a given internet host and 

1067 port: socket family AF_INET or socket.AF_INET6 depending on host (or 

1068 family if specified), socket type SOCK_STREAM. protocol_factory must be 

1069 a callable returning a protocol instance. 

1070 

1071 This method is a coroutine which will try to establish the connection 

1072 in the background. When successful, the coroutine returns a 

1073 (transport, protocol) pair. 

1074 """ 

1075 if server_hostname is not None and not ssl: 

1076 raise ValueError('server_hostname is only meaningful with ssl') 

1077 

1078 if server_hostname is None and ssl: 

1079 # Use host as default for server_hostname. It is an error 

1080 # if host is empty or not set, e.g. when an 

1081 # already-connected socket was passed or when only a port 

1082 # is given. To avoid this error, you can pass 

1083 # server_hostname='' -- this will bypass the hostname 

1084 # check. (This also means that if host is a numeric 

1085 # IP/IPv6 address, we will attempt to verify that exact 

1086 # address; this will probably fail, but it is possible to 

1087 # create a certificate for a specific IP address, so we 

1088 # don't judge it here.) 

1089 if not host: 

1090 raise ValueError('You must set server_hostname ' 

1091 'when using ssl without a host') 

1092 server_hostname = host 

1093 

1094 if ssl_handshake_timeout is not None and not ssl: 

1095 raise ValueError( 

1096 'ssl_handshake_timeout is only meaningful with ssl') 

1097 

1098 if ssl_shutdown_timeout is not None and not ssl: 1098 ↛ 1099line 1098 didn't jump to line 1099 because the condition on line 1098 was never true

1099 raise ValueError( 

1100 'ssl_shutdown_timeout is only meaningful with ssl') 

1101 

1102 if sock is not None: 

1103 _check_ssl_socket(sock) 

1104 

1105 if happy_eyeballs_delay is not None and interleave is None: 

1106 # If using happy eyeballs, default to interleave addresses by family 

1107 interleave = 1 

1108 

1109 if host is not None or port is not None: 

1110 if sock is not None: 

1111 raise ValueError( 

1112 'host/port and sock can not be specified at the same time') 

1113 

1114 infos = await self._ensure_resolved( 

1115 (host, port), family=family, 

1116 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) 

1117 if not infos: 

1118 raise OSError('getaddrinfo() returned empty list') 

1119 

1120 if local_addr is not None: 

1121 laddr_infos = await self._ensure_resolved( 

1122 local_addr, family=family, 

1123 type=socket.SOCK_STREAM, proto=proto, 

1124 flags=flags, loop=self) 

1125 if not laddr_infos: 

1126 raise OSError('getaddrinfo() returned empty list') 

1127 else: 

1128 laddr_infos = None 

1129 

1130 if interleave: 

1131 infos = _interleave_addrinfos(infos, interleave) 

1132 

1133 exceptions = [] 

1134 if happy_eyeballs_delay is None: 

1135 # not using happy eyeballs 

1136 for addrinfo in infos: 

1137 try: 

1138 sock = await self._connect_sock( 

1139 exceptions, addrinfo, laddr_infos) 

1140 break 

1141 except OSError: 

1142 continue 

1143 else: # using happy eyeballs 

1144 sock = (await staggered.staggered_race( 

1145 ( 

1146 # can't use functools.partial as it keeps a reference 

1147 # to exceptions 

1148 lambda addrinfo=addrinfo: self._connect_sock( 

1149 exceptions, addrinfo, laddr_infos 

1150 ) 

1151 for addrinfo in infos 

1152 ), 

1153 happy_eyeballs_delay, 

1154 loop=self, 

1155 ))[0] # can't use sock, _, _ as it keeks a reference to exceptions 

1156 

1157 if sock is None: 

1158 exceptions = [exc for sub in exceptions for exc in sub] 

1159 try: 

1160 if all_errors: 

1161 raise ExceptionGroup("create_connection failed", exceptions) 

1162 if len(exceptions) == 1: 

1163 raise exceptions[0] 

1164 else: 

1165 # If they all have the same str(), raise one. 

1166 model = str(exceptions[0]) 

1167 if all(str(exc) == model for exc in exceptions): 

1168 raise exceptions[0] 

1169 # Raise a combined exception so the user can see all 

1170 # the various error messages. 

1171 raise OSError('Multiple exceptions: {}'.format( 

1172 ', '.join(str(exc) for exc in exceptions))) 

1173 finally: 

1174 exceptions = None 

1175 

1176 else: 

1177 if sock is None: 

1178 raise ValueError( 

1179 'host and port was not specified and no sock specified') 

1180 if sock.type != socket.SOCK_STREAM: 

1181 # We allow AF_INET, AF_INET6, AF_UNIX as long as they 

1182 # are SOCK_STREAM. 

1183 # We support passing AF_UNIX sockets even though we have 

1184 # a dedicated API for that: create_unix_connection. 

1185 # Disallowing AF_UNIX in this method, breaks backwards 

1186 # compatibility. 

1187 raise ValueError( 

1188 f'A Stream Socket was expected, got {sock!r}') 

1189 

1190 transport, protocol = await self._create_connection_transport( 

1191 sock, protocol_factory, ssl, server_hostname, 

1192 ssl_handshake_timeout=ssl_handshake_timeout, 

1193 ssl_shutdown_timeout=ssl_shutdown_timeout) 

1194 if self._debug: 

1195 # Get the socket from the transport because SSL transport closes 

1196 # the old socket and creates a new SSL socket 

1197 sock = transport.get_extra_info('socket') 

1198 logger.debug("%r connected to %s:%r: (%r, %r)", 

1199 sock, host, port, transport, protocol) 

1200 return transport, protocol 

1201 

1202 async def _create_connection_transport( 

1203 self, sock, protocol_factory, ssl, 

1204 server_hostname, server_side=False, 

1205 ssl_handshake_timeout=None, 

1206 ssl_shutdown_timeout=None): 

1207 

1208 sock.setblocking(False) 

1209 

1210 protocol = protocol_factory() 

1211 waiter = self.create_future() 

1212 if ssl: 

1213 sslcontext = None if isinstance(ssl, bool) else ssl 

1214 transport = self._make_ssl_transport( 

1215 sock, protocol, sslcontext, waiter, 

1216 server_side=server_side, server_hostname=server_hostname, 

1217 ssl_handshake_timeout=ssl_handshake_timeout, 

1218 ssl_shutdown_timeout=ssl_shutdown_timeout) 

1219 else: 

1220 transport = self._make_socket_transport(sock, protocol, waiter) 

1221 

1222 try: 

1223 await waiter 

1224 except: 

1225 transport.close() 

1226 raise 

1227 

1228 return transport, protocol 

1229 

1230 async def sendfile(self, transport, file, offset=0, count=None, 

1231 *, fallback=True): 

1232 """Send a file to transport. 

1233 

1234 Return the total number of bytes which were sent. 

1235 

1236 The method uses high-performance os.sendfile if available. 

1237 

1238 file must be a regular file object opened in binary mode. 

1239 

1240 offset tells from where to start reading the file. If specified, 

1241 count is the total number of bytes to transmit as opposed to 

1242 sending the file until EOF is reached. File position is updated on 

1243 return or also in case of error in which case file.tell() 

1244 can be used to figure out the number of bytes 

1245 which were sent. 

1246 

1247 fallback set to True makes asyncio to manually read and send 

1248 the file when the platform does not support the sendfile syscall 

1249 (e.g. Windows or SSL socket on Unix). 

1250 

1251 Raise SendfileNotAvailableError if the system does not support 

1252 sendfile syscall and fallback is False. 

1253 """ 

1254 if transport.is_closing(): 

1255 raise RuntimeError("Transport is closing") 

1256 mode = getattr(transport, '_sendfile_compatible', 

1257 constants._SendfileMode.UNSUPPORTED) 

1258 if mode is constants._SendfileMode.UNSUPPORTED: 

1259 raise RuntimeError( 

1260 f"sendfile is not supported for transport {transport!r}") 

1261 if mode is constants._SendfileMode.TRY_NATIVE: 

1262 try: 

1263 return await self._sendfile_native(transport, file, 

1264 offset, count) 

1265 except exceptions.SendfileNotAvailableError as exc: 

1266 if not fallback: 

1267 raise 

1268 

1269 if not fallback: 

1270 raise RuntimeError( 

1271 f"fallback is disabled and native sendfile is not " 

1272 f"supported for transport {transport!r}") 

1273 

1274 return await self._sendfile_fallback(transport, file, 

1275 offset, count) 

1276 

1277 async def _sendfile_native(self, transp, file, offset, count): 

1278 raise exceptions.SendfileNotAvailableError( 

1279 "sendfile syscall is not supported") 

1280 

1281 async def _sendfile_fallback(self, transp, file, offset, count): 

1282 if offset: 

1283 file.seek(offset) 

1284 blocksize = min(count, 16384) if count else 16384 

1285 buf = bytearray(blocksize) 

1286 total_sent = 0 

1287 proto = _SendfileFallbackProtocol(transp) 

1288 try: 

1289 while True: 

1290 if count: 

1291 blocksize = min(count - total_sent, blocksize) 

1292 if blocksize <= 0: 

1293 return total_sent 

1294 view = memoryview(buf)[:blocksize] 

1295 read = await self.run_in_executor(None, file.readinto, view) 

1296 if not read: 

1297 return total_sent # EOF 

1298 transp.write(view[:read]) 

1299 await proto.drain() 

1300 total_sent += read 

1301 finally: 

1302 if total_sent > 0 and hasattr(file, 'seek'): 

1303 file.seek(offset + total_sent) 

1304 await proto.restore() 

1305 

1306 async def start_tls(self, transport, protocol, sslcontext, *, 

1307 server_side=False, 

1308 server_hostname=None, 

1309 ssl_handshake_timeout=None, 

1310 ssl_shutdown_timeout=None): 

1311 """Upgrade transport to TLS. 

1312 

1313 Return a new transport that *protocol* should start using 

1314 immediately. 

1315 """ 

1316 if ssl is None: 1316 ↛ 1317line 1316 didn't jump to line 1317 because the condition on line 1316 was never true

1317 raise RuntimeError('Python ssl module is not available') 

1318 

1319 if not isinstance(sslcontext, ssl.SSLContext): 

1320 raise TypeError( 

1321 f'sslcontext is expected to be an instance of ssl.SSLContext, ' 

1322 f'got {sslcontext!r}') 

1323 

1324 if not getattr(transport, '_start_tls_compatible', False): 

1325 raise TypeError( 

1326 f'transport {transport!r} is not supported by start_tls()') 

1327 

1328 waiter = self.create_future() 

1329 ssl_protocol = sslproto.SSLProtocol( 

1330 self, protocol, sslcontext, waiter, 

1331 server_side, server_hostname, 

1332 ssl_handshake_timeout=ssl_handshake_timeout, 

1333 ssl_shutdown_timeout=ssl_shutdown_timeout, 

1334 call_connection_made=False) 

1335 

1336 # Pause early so that "ssl_protocol.data_received()" doesn't 

1337 # have a chance to get called before "ssl_protocol.connection_made()". 

1338 transport.pause_reading() 

1339 

1340 transport.set_protocol(ssl_protocol) 

1341 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport) 

1342 resume_cb = self.call_soon(transport.resume_reading) 

1343 

1344 try: 

1345 await waiter 

1346 except BaseException: 

1347 transport.close() 

1348 conmade_cb.cancel() 

1349 resume_cb.cancel() 

1350 raise 

1351 

1352 return ssl_protocol._app_transport 

1353 

1354 async def create_datagram_endpoint(self, protocol_factory, 

1355 local_addr=None, remote_addr=None, *, 

1356 family=0, proto=0, flags=0, 

1357 reuse_port=None, 

1358 allow_broadcast=None, sock=None): 

1359 """Create datagram connection.""" 

1360 if sock is not None: 

1361 if sock.type == socket.SOCK_STREAM: 

1362 raise ValueError( 

1363 f'A datagram socket was expected, got {sock!r}') 

1364 if (local_addr or remote_addr or 

1365 family or proto or flags or 

1366 reuse_port or allow_broadcast): 

1367 # show the problematic kwargs in exception msg 

1368 opts = dict(local_addr=local_addr, remote_addr=remote_addr, 

1369 family=family, proto=proto, flags=flags, 

1370 reuse_port=reuse_port, 

1371 allow_broadcast=allow_broadcast) 

1372 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v) 

1373 raise ValueError( 

1374 f'socket modifier keyword arguments can not be used ' 

1375 f'when sock is specified. ({problems})') 

1376 sock.setblocking(False) 

1377 r_addr = None 

1378 else: 

1379 if not (local_addr or remote_addr): 

1380 if family == 0: 

1381 raise ValueError('unexpected address family') 

1382 addr_pairs_info = (((family, proto), (None, None)),) 

1383 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: 

1384 for addr in (local_addr, remote_addr): 

1385 if addr is not None and not isinstance(addr, str): 1385 ↛ 1386line 1385 didn't jump to line 1386 because the condition on line 1385 was never true

1386 raise TypeError('string is expected') 

1387 

1388 if local_addr and local_addr[0] not in (0, '\x00'): 1388 ↛ 1400line 1388 didn't jump to line 1400 because the condition on line 1388 was always true

1389 try: 

1390 if stat.S_ISSOCK(os.stat(local_addr).st_mode): 1390 ↛ 1400line 1390 didn't jump to line 1400 because the condition on line 1390 was always true

1391 os.remove(local_addr) 

1392 except FileNotFoundError: 

1393 pass 

1394 except OSError as err: 

1395 # Directory may have permissions only to create socket. 

1396 logger.error('Unable to check or remove stale UNIX ' 

1397 'socket %r: %r', 

1398 local_addr, err) 

1399 

1400 addr_pairs_info = (((family, proto), 

1401 (local_addr, remote_addr)), ) 

1402 else: 

1403 # join address by (family, protocol) 

1404 addr_infos = {} # Using order preserving dict 

1405 for idx, addr in ((0, local_addr), (1, remote_addr)): 

1406 if addr is not None: 

1407 if not (isinstance(addr, tuple) and len(addr) == 2): 

1408 raise TypeError('2-tuple is expected') 

1409 

1410 infos = await self._ensure_resolved( 

1411 addr, family=family, type=socket.SOCK_DGRAM, 

1412 proto=proto, flags=flags, loop=self) 

1413 if not infos: 

1414 raise OSError('getaddrinfo() returned empty list') 

1415 

1416 for fam, _, pro, _, address in infos: 

1417 key = (fam, pro) 

1418 if key not in addr_infos: 1418 ↛ 1420line 1418 didn't jump to line 1420 because the condition on line 1418 was always true

1419 addr_infos[key] = [None, None] 

1420 addr_infos[key][idx] = address 

1421 

1422 # each addr has to have info for each (family, proto) pair 

1423 addr_pairs_info = [ 

1424 (key, addr_pair) for key, addr_pair in addr_infos.items() 

1425 if not ((local_addr and addr_pair[0] is None) or 

1426 (remote_addr and addr_pair[1] is None))] 

1427 

1428 if not addr_pairs_info: 

1429 raise ValueError('can not get address information') 

1430 

1431 exceptions = [] 

1432 

1433 for ((family, proto), 

1434 (local_address, remote_address)) in addr_pairs_info: 

1435 sock = None 

1436 r_addr = None 

1437 try: 

1438 sock = socket.socket( 

1439 family=family, type=socket.SOCK_DGRAM, proto=proto) 

1440 if reuse_port: 

1441 _set_reuseport(sock) 

1442 if allow_broadcast: 

1443 sock.setsockopt( 

1444 socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 

1445 sock.setblocking(False) 

1446 

1447 if local_addr: 

1448 sock.bind(local_address) 

1449 if remote_addr: 

1450 if not allow_broadcast: 

1451 await self.sock_connect(sock, remote_address) 

1452 r_addr = remote_address 

1453 except OSError as exc: 

1454 if sock is not None: 

1455 sock.close() 

1456 exceptions.append(exc) 

1457 except: 

1458 if sock is not None: 1458 ↛ 1460line 1458 didn't jump to line 1460 because the condition on line 1458 was always true

1459 sock.close() 

1460 raise 

1461 else: 

1462 break 

1463 else: 

1464 raise exceptions[0] 

1465 

1466 protocol = protocol_factory() 

1467 waiter = self.create_future() 

1468 transport = self._make_datagram_transport( 

1469 sock, protocol, r_addr, waiter) 

1470 if self._debug: 1470 ↛ 1471line 1470 didn't jump to line 1471 because the condition on line 1470 was never true

1471 if local_addr: 

1472 logger.info("Datagram endpoint local_addr=%r remote_addr=%r " 

1473 "created: (%r, %r)", 

1474 local_addr, remote_addr, transport, protocol) 

1475 else: 

1476 logger.debug("Datagram endpoint remote_addr=%r created: " 

1477 "(%r, %r)", 

1478 remote_addr, transport, protocol) 

1479 

1480 try: 

1481 await waiter 

1482 except: 

1483 transport.close() 

1484 raise 

1485 

1486 return transport, protocol 

1487 

1488 async def _ensure_resolved(self, address, *, 

1489 family=0, type=socket.SOCK_STREAM, 

1490 proto=0, flags=0, loop): 

1491 host, port = address[:2] 

1492 info = _ipaddr_info(host, port, family, type, proto, *address[2:]) 

1493 if info is not None: 

1494 # "host" is already a resolved IP. 

1495 return [info] 

1496 else: 

1497 return await loop.getaddrinfo(host, port, family=family, type=type, 

1498 proto=proto, flags=flags) 

1499 

1500 async def _create_server_getaddrinfo(self, host, port, family, flags): 

1501 infos = await self._ensure_resolved((host, port), family=family, 

1502 type=socket.SOCK_STREAM, 

1503 flags=flags, loop=self) 

1504 if not infos: 

1505 raise OSError(f'getaddrinfo({host!r}) returned empty list') 

1506 return infos 

1507 

1508 async def create_server( 

1509 self, protocol_factory, host=None, port=None, 

1510 *, 

1511 family=socket.AF_UNSPEC, 

1512 flags=socket.AI_PASSIVE, 

1513 sock=None, 

1514 backlog=100, 

1515 ssl=None, 

1516 reuse_address=None, 

1517 reuse_port=None, 

1518 keep_alive=None, 

1519 ssl_handshake_timeout=None, 

1520 ssl_shutdown_timeout=None, 

1521 start_serving=True): 

1522 """Create a TCP server. 

1523 

1524 The host parameter can be a string, in that case the TCP server is 

1525 bound to host and port. 

1526 

1527 The host parameter can also be a sequence of strings and in that case 

1528 the TCP server is bound to all hosts of the sequence. If a host 

1529 appears multiple times (possibly indirectly e.g. when hostnames 

1530 resolve to the same IP address), the server is only bound once to that 

1531 host. 

1532 

1533 Return a Server object which can be used to stop the service. 

1534 

1535 This method is a coroutine. 

1536 """ 

1537 if isinstance(ssl, bool): 1537 ↛ 1538line 1537 didn't jump to line 1538 because the condition on line 1537 was never true

1538 raise TypeError('ssl argument must be an SSLContext or None') 

1539 

1540 if ssl_handshake_timeout is not None and ssl is None: 

1541 raise ValueError( 

1542 'ssl_handshake_timeout is only meaningful with ssl') 

1543 

1544 if ssl_shutdown_timeout is not None and ssl is None: 1544 ↛ 1545line 1544 didn't jump to line 1545 because the condition on line 1544 was never true

1545 raise ValueError( 

1546 'ssl_shutdown_timeout is only meaningful with ssl') 

1547 

1548 if sock is not None: 

1549 _check_ssl_socket(sock) 

1550 

1551 if host is not None or port is not None: 

1552 if sock is not None: 

1553 raise ValueError( 

1554 'host/port and sock can not be specified at the same time') 

1555 

1556 if reuse_address is None: 1556 ↛ 1558line 1556 didn't jump to line 1558 because the condition on line 1556 was always true

1557 reuse_address = os.name == "posix" and sys.platform != "cygwin" 

1558 sockets = [] 

1559 if host == '': 

1560 hosts = [None] 

1561 elif (isinstance(host, str) or 

1562 not isinstance(host, collections.abc.Iterable)): 

1563 hosts = [host] 

1564 else: 

1565 hosts = host 

1566 

1567 fs = [self._create_server_getaddrinfo(host, port, family=family, 

1568 flags=flags) 

1569 for host in hosts] 

1570 infos = await tasks.gather(*fs) 

1571 infos = set(itertools.chain.from_iterable(infos)) 

1572 

1573 completed = False 

1574 try: 

1575 for res in infos: 

1576 af, socktype, proto, canonname, sa = res 

1577 try: 

1578 sock = socket.socket(af, socktype, proto) 

1579 except socket.error: 

1580 # Assume it's a bad family/type/protocol combination. 

1581 if self._debug: 

1582 logger.warning('create_server() failed to create ' 

1583 'socket.socket(%r, %r, %r)', 

1584 af, socktype, proto, exc_info=True) 

1585 continue 

1586 sockets.append(sock) 

1587 if reuse_address: 1587 ↛ 1592line 1587 didn't jump to line 1592 because the condition on line 1587 was always true

1588 sock.setsockopt( 

1589 socket.SOL_SOCKET, socket.SO_REUSEADDR, True) 

1590 # Since Linux 6.12.9, SO_REUSEPORT is not allowed 

1591 # on other address families than AF_INET/AF_INET6. 

1592 if reuse_port and af in (socket.AF_INET, socket.AF_INET6): 

1593 _set_reuseport(sock) 

1594 if keep_alive: 1594 ↛ 1595line 1594 didn't jump to line 1595 because the condition on line 1594 was never true

1595 sock.setsockopt( 

1596 socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) 

1597 # Disable IPv4/IPv6 dual stack support (enabled by 

1598 # default on Linux) which makes a single socket 

1599 # listen on both address families. 

1600 if (_HAS_IPv6 and 

1601 af == socket.AF_INET6 and 

1602 hasattr(socket, 'IPPROTO_IPV6')): 

1603 sock.setsockopt(socket.IPPROTO_IPV6, 

1604 socket.IPV6_V6ONLY, 

1605 True) 

1606 try: 

1607 sock.bind(sa) 

1608 except OSError as err: 

1609 msg = ('error while attempting ' 

1610 'to bind on address %r: %s' 

1611 % (sa, str(err).lower())) 

1612 if err.errno == errno.EADDRNOTAVAIL: 1612 ↛ 1614line 1612 didn't jump to line 1614 because the condition on line 1612 was never true

1613 # Assume the family is not enabled (bpo-30945) 

1614 sockets.pop() 

1615 sock.close() 

1616 if self._debug: 

1617 logger.warning(msg) 

1618 continue 

1619 raise OSError(err.errno, msg) from None 

1620 

1621 if not sockets: 1621 ↛ 1622line 1621 didn't jump to line 1622 because the condition on line 1621 was never true

1622 raise OSError('could not bind on any address out of %r' 

1623 % ([info[4] for info in infos],)) 

1624 

1625 completed = True 

1626 finally: 

1627 if not completed: 

1628 for sock in sockets: 1628 ↛ 1637line 1628 didn't jump to line 1637 because the loop on line 1628 didn't complete

1629 sock.close() 

1630 else: 

1631 if sock is None: 

1632 raise ValueError('Neither host/port nor sock were specified') 

1633 if sock.type != socket.SOCK_STREAM: 

1634 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 

1635 sockets = [sock] 

1636 

1637 for sock in sockets: 

1638 sock.setblocking(False) 

1639 

1640 server = Server(self, sockets, protocol_factory, 

1641 ssl, backlog, ssl_handshake_timeout, 

1642 ssl_shutdown_timeout) 

1643 if start_serving: 

1644 server._start_serving() 

1645 # Skip one loop iteration so that all 'loop.add_reader' 

1646 # go through. 

1647 await tasks.sleep(0) 

1648 

1649 if self._debug: 

1650 logger.info("%r is serving", server) 

1651 return server 

1652 

1653 async def connect_accepted_socket( 

1654 self, protocol_factory, sock, 

1655 *, ssl=None, 

1656 ssl_handshake_timeout=None, 

1657 ssl_shutdown_timeout=None): 

1658 if sock.type != socket.SOCK_STREAM: 1658 ↛ 1659line 1658 didn't jump to line 1659 because the condition on line 1658 was never true

1659 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 

1660 

1661 if ssl_handshake_timeout is not None and not ssl: 

1662 raise ValueError( 

1663 'ssl_handshake_timeout is only meaningful with ssl') 

1664 

1665 if ssl_shutdown_timeout is not None and not ssl: 1665 ↛ 1666line 1665 didn't jump to line 1666 because the condition on line 1665 was never true

1666 raise ValueError( 

1667 'ssl_shutdown_timeout is only meaningful with ssl') 

1668 

1669 _check_ssl_socket(sock) 

1670 

1671 transport, protocol = await self._create_connection_transport( 

1672 sock, protocol_factory, ssl, '', server_side=True, 

1673 ssl_handshake_timeout=ssl_handshake_timeout, 

1674 ssl_shutdown_timeout=ssl_shutdown_timeout) 

1675 if self._debug: 1675 ↛ 1678line 1675 didn't jump to line 1678 because the condition on line 1675 was never true

1676 # Get the socket from the transport because SSL transport closes 

1677 # the old socket and creates a new SSL socket 

1678 sock = transport.get_extra_info('socket') 

1679 logger.debug("%r handled: (%r, %r)", sock, transport, protocol) 

1680 return transport, protocol 

1681 

1682 async def connect_read_pipe(self, protocol_factory, pipe): 

1683 protocol = protocol_factory() 

1684 waiter = self.create_future() 

1685 transport = self._make_read_pipe_transport(pipe, protocol, waiter) 

1686 

1687 try: 

1688 await waiter 

1689 except: 

1690 transport.close() 

1691 raise 

1692 

1693 if self._debug: 1693 ↛ 1694line 1693 didn't jump to line 1694 because the condition on line 1693 was never true

1694 logger.debug('Read pipe %r connected: (%r, %r)', 

1695 pipe.fileno(), transport, protocol) 

1696 return transport, protocol 

1697 

1698 async def connect_write_pipe(self, protocol_factory, pipe): 

1699 protocol = protocol_factory() 

1700 waiter = self.create_future() 

1701 transport = self._make_write_pipe_transport(pipe, protocol, waiter) 

1702 

1703 try: 

1704 await waiter 

1705 except: 

1706 transport.close() 

1707 raise 

1708 

1709 if self._debug: 1709 ↛ 1710line 1709 didn't jump to line 1710 because the condition on line 1709 was never true

1710 logger.debug('Write pipe %r connected: (%r, %r)', 

1711 pipe.fileno(), transport, protocol) 

1712 return transport, protocol 

1713 

1714 def _log_subprocess(self, msg, stdin, stdout, stderr): 

1715 info = [msg] 

1716 if stdin is not None: 

1717 info.append(f'stdin={_format_pipe(stdin)}') 

1718 if stdout is not None and stderr == subprocess.STDOUT: 

1719 info.append(f'stdout=stderr={_format_pipe(stdout)}') 

1720 else: 

1721 if stdout is not None: 

1722 info.append(f'stdout={_format_pipe(stdout)}') 

1723 if stderr is not None: 

1724 info.append(f'stderr={_format_pipe(stderr)}') 

1725 logger.debug(' '.join(info)) 

1726 

1727 async def subprocess_shell(self, protocol_factory, cmd, *, 

1728 stdin=subprocess.PIPE, 

1729 stdout=subprocess.PIPE, 

1730 stderr=subprocess.PIPE, 

1731 universal_newlines=False, 

1732 shell=True, bufsize=0, 

1733 encoding=None, errors=None, text=None, 

1734 **kwargs): 

1735 if not isinstance(cmd, (bytes, str)): 

1736 raise ValueError("cmd must be a string") 

1737 if universal_newlines: 

1738 raise ValueError("universal_newlines must be False") 

1739 if not shell: 

1740 raise ValueError("shell must be True") 

1741 if bufsize != 0: 

1742 raise ValueError("bufsize must be 0") 

1743 if text: 

1744 raise ValueError("text must be False") 

1745 if encoding is not None: 

1746 raise ValueError("encoding must be None") 

1747 if errors is not None: 

1748 raise ValueError("errors must be None") 

1749 

1750 protocol = protocol_factory() 

1751 debug_log = None 

1752 if self._debug: 1752 ↛ 1755line 1752 didn't jump to line 1755 because the condition on line 1752 was never true

1753 # don't log parameters: they may contain sensitive information 

1754 # (password) and may be too long 

1755 debug_log = 'run shell command %r' % cmd 

1756 self._log_subprocess(debug_log, stdin, stdout, stderr) 

1757 transport = await self._make_subprocess_transport( 

1758 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) 

1759 if self._debug and debug_log is not None: 1759 ↛ 1760line 1759 didn't jump to line 1760 because the condition on line 1759 was never true

1760 logger.info('%s: %r', debug_log, transport) 

1761 return transport, protocol 

1762 

1763 async def subprocess_exec(self, protocol_factory, program, *args, 

1764 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 

1765 stderr=subprocess.PIPE, universal_newlines=False, 

1766 shell=False, bufsize=0, 

1767 encoding=None, errors=None, text=None, 

1768 **kwargs): 

1769 if universal_newlines: 

1770 raise ValueError("universal_newlines must be False") 

1771 if shell: 

1772 raise ValueError("shell must be False") 

1773 if bufsize != 0: 

1774 raise ValueError("bufsize must be 0") 

1775 if text: 

1776 raise ValueError("text must be False") 

1777 if encoding is not None: 

1778 raise ValueError("encoding must be None") 

1779 if errors is not None: 

1780 raise ValueError("errors must be None") 

1781 

1782 popen_args = (program,) + args 

1783 protocol = protocol_factory() 

1784 debug_log = None 

1785 if self._debug: 1785 ↛ 1788line 1785 didn't jump to line 1788 because the condition on line 1785 was never true

1786 # don't log parameters: they may contain sensitive information 

1787 # (password) and may be too long 

1788 debug_log = f'execute program {program!r}' 

1789 self._log_subprocess(debug_log, stdin, stdout, stderr) 

1790 transport = await self._make_subprocess_transport( 

1791 protocol, popen_args, False, stdin, stdout, stderr, 

1792 bufsize, **kwargs) 

1793 if self._debug and debug_log is not None: 1793 ↛ 1794line 1793 didn't jump to line 1794 because the condition on line 1793 was never true

1794 logger.info('%s: %r', debug_log, transport) 

1795 return transport, protocol 

1796 

1797 def get_exception_handler(self): 

1798 """Return an exception handler, or None if the default one is in use. 

1799 """ 

1800 return self._exception_handler 

1801 

1802 def set_exception_handler(self, handler): 

1803 """Set handler as the new event loop exception handler. 

1804 

1805 If handler is None, the default exception handler will 

1806 be set. 

1807 

1808 If handler is a callable object, it should have a 

1809 signature matching '(loop, context)', where 'loop' 

1810 will be a reference to the active event loop, 'context' 

1811 will be a dict object (see `call_exception_handler()` 

1812 documentation for details about context). 

1813 """ 

1814 if handler is not None and not callable(handler): 

1815 raise TypeError(f'A callable object or None is expected, ' 

1816 f'got {handler!r}') 

1817 self._exception_handler = handler 

1818 

1819 def default_exception_handler(self, context): 

1820 """Default exception handler. 

1821 

1822 This is called when an exception occurs and no exception 

1823 handler is set, and can be called by a custom exception 

1824 handler that wants to defer to the default behavior. 

1825 

1826 This default handler logs the error message and other 

1827 context-dependent information. In debug mode, a truncated 

1828 stack trace is also appended showing where the given object 

1829 (e.g. a handle or future or task) was created, if any. 

1830 

1831 The context parameter has the same meaning as in 

1832 `call_exception_handler()`. 

1833 """ 

1834 message = context.get('message') 

1835 if not message: 1835 ↛ 1836line 1835 didn't jump to line 1836 because the condition on line 1835 was never true

1836 message = 'Unhandled exception in event loop' 

1837 

1838 exception = context.get('exception') 

1839 if exception is not None: 

1840 exc_info = (type(exception), exception, exception.__traceback__) 

1841 else: 

1842 exc_info = False 

1843 

1844 if ('source_traceback' not in context and 1844 ↛ 1847line 1844 didn't jump to line 1847 because the condition on line 1844 was never true

1845 self._current_handle is not None and 

1846 self._current_handle._source_traceback): 

1847 context['handle_traceback'] = \ 

1848 self._current_handle._source_traceback 

1849 

1850 log_lines = [message] 

1851 for key in sorted(context): 

1852 if key in {'message', 'exception'}: 

1853 continue 

1854 value = context[key] 

1855 if key == 'source_traceback': 

1856 tb = ''.join(traceback.format_list(value)) 

1857 value = 'Object created at (most recent call last):\n' 

1858 value += tb.rstrip() 

1859 elif key == 'handle_traceback': 1859 ↛ 1860line 1859 didn't jump to line 1860 because the condition on line 1859 was never true

1860 tb = ''.join(traceback.format_list(value)) 

1861 value = 'Handle created at (most recent call last):\n' 

1862 value += tb.rstrip() 

1863 else: 

1864 value = repr(value) 

1865 log_lines.append(f'{key}: {value}') 

1866 

1867 logger.error('\n'.join(log_lines), exc_info=exc_info) 

1868 

1869 def call_exception_handler(self, context): 

1870 """Call the current event loop's exception handler. 

1871 

1872 The context argument is a dict containing the following keys: 

1873 

1874 - 'message': Error message; 

1875 - 'exception' (optional): Exception object; 

1876 - 'future' (optional): Future instance; 

1877 - 'task' (optional): Task instance; 

1878 - 'handle' (optional): Handle instance; 

1879 - 'protocol' (optional): Protocol instance; 

1880 - 'transport' (optional): Transport instance; 

1881 - 'socket' (optional): Socket instance; 

1882 - 'source_traceback' (optional): Traceback of the source; 

1883 - 'handle_traceback' (optional): Traceback of the handle; 

1884 - 'asyncgen' (optional): Asynchronous generator that caused 

1885 the exception. 

1886 

1887 New keys maybe introduced in the future. 

1888 

1889 Note: do not overload this method in an event loop subclass. 

1890 For custom exception handling, use the 

1891 `set_exception_handler()` method. 

1892 """ 

1893 if self._exception_handler is None: 

1894 try: 

1895 self.default_exception_handler(context) 

1896 except (SystemExit, KeyboardInterrupt): 

1897 raise 

1898 except BaseException: 

1899 # Second protection layer for unexpected errors 

1900 # in the default implementation, as well as for subclassed 

1901 # event loops with overloaded "default_exception_handler". 

1902 logger.error('Exception in default exception handler', 

1903 exc_info=True) 

1904 else: 

1905 try: 

1906 ctx = None 

1907 thing = context.get("task") 

1908 if thing is None: 

1909 # Even though Futures don't have a context, 

1910 # Task is a subclass of Future, 

1911 # and sometimes the 'future' key holds a Task. 

1912 thing = context.get("future") 

1913 if thing is None: 

1914 # Handles also have a context. 

1915 thing = context.get("handle") 

1916 if thing is not None and hasattr(thing, "get_context"): 

1917 ctx = thing.get_context() 

1918 if ctx is not None and hasattr(ctx, "run"): 

1919 ctx.run(self._exception_handler, self, context) 

1920 else: 

1921 self._exception_handler(self, context) 

1922 except (SystemExit, KeyboardInterrupt): 

1923 raise 

1924 except BaseException as exc: 

1925 # Exception in the user set custom exception handler. 

1926 try: 

1927 # Let's try default handler. 

1928 self.default_exception_handler({ 

1929 'message': 'Unhandled error in exception handler', 

1930 'exception': exc, 

1931 'context': context, 

1932 }) 

1933 except (SystemExit, KeyboardInterrupt): 

1934 raise 

1935 except BaseException: 

1936 # Guard 'default_exception_handler' in case it is 

1937 # overloaded. 

1938 logger.error('Exception in default exception handler ' 

1939 'while handling an unexpected error ' 

1940 'in custom exception handler', 

1941 exc_info=True) 

1942 

1943 def _add_callback(self, handle): 

1944 """Add a Handle to _ready.""" 

1945 if not handle._cancelled: 

1946 self._ready.append(handle) 

1947 

1948 def _add_callback_signalsafe(self, handle): 

1949 """Like _add_callback() but called from a signal handler.""" 

1950 self._add_callback(handle) 

1951 self._write_to_self() 

1952 

1953 def _timer_handle_cancelled(self, handle): 

1954 """Notification that a TimerHandle has been cancelled.""" 

1955 if handle._scheduled: 

1956 self._timer_cancelled_count += 1 

1957 

1958 def _run_once(self): 

1959 """Run one full iteration of the event loop. 

1960 

1961 This calls all currently ready callbacks, polls for I/O, 

1962 schedules the resulting callbacks, and finally schedules 

1963 'call_later' callbacks. 

1964 """ 

1965 

1966 sched_count = len(self._scheduled) 

1967 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and 

1968 self._timer_cancelled_count / sched_count > 

1969 _MIN_CANCELLED_TIMER_HANDLES_FRACTION): 

1970 # Remove delayed calls that were cancelled if their number 

1971 # is too high 

1972 new_scheduled = [] 

1973 for handle in self._scheduled: 

1974 if handle._cancelled: 

1975 handle._scheduled = False 

1976 else: 

1977 new_scheduled.append(handle) 

1978 

1979 heapq.heapify(new_scheduled) 

1980 self._scheduled = new_scheduled 

1981 self._timer_cancelled_count = 0 

1982 else: 

1983 # Remove delayed calls that were cancelled from head of queue. 

1984 while self._scheduled and self._scheduled[0]._cancelled: 

1985 self._timer_cancelled_count -= 1 

1986 handle = heapq.heappop(self._scheduled) 

1987 handle._scheduled = False 

1988 

1989 timeout = None 

1990 if self._ready or self._stopping: 

1991 timeout = 0 

1992 elif self._scheduled: 

1993 # Compute the desired timeout. 

1994 timeout = self._scheduled[0]._when - self.time() 

1995 if timeout > MAXIMUM_SELECT_TIMEOUT: 1995 ↛ 1996line 1995 didn't jump to line 1996 because the condition on line 1995 was never true

1996 timeout = MAXIMUM_SELECT_TIMEOUT 

1997 elif timeout < 0: 

1998 timeout = 0 

1999 

2000 event_list = self._selector.select(timeout) 

2001 self._process_events(event_list) 

2002 # Needed to break cycles when an exception occurs. 

2003 event_list = None 

2004 

2005 # Handle 'later' callbacks that are ready. 

2006 end_time = self.time() + self._clock_resolution 

2007 while self._scheduled: 

2008 handle = self._scheduled[0] 

2009 if handle._when >= end_time: 

2010 break 

2011 handle = heapq.heappop(self._scheduled) 

2012 handle._scheduled = False 

2013 self._ready.append(handle) 

2014 

2015 # This is the only place where callbacks are actually *called*. 

2016 # All other places just add them to ready. 

2017 # Note: We run all currently scheduled callbacks, but not any 

2018 # callbacks scheduled by callbacks run this time around -- 

2019 # they will be run the next time (after another I/O poll). 

2020 # Use an idiom that is thread-safe without using locks. 

2021 ntodo = len(self._ready) 

2022 for i in range(ntodo): 

2023 handle = self._ready.popleft() 

2024 if handle._cancelled: 

2025 continue 

2026 if self._debug: 

2027 try: 

2028 self._current_handle = handle 

2029 t0 = self.time() 

2030 handle._run() 

2031 dt = self.time() - t0 

2032 if dt >= self.slow_callback_duration: 

2033 logger.warning('Executing %s took %.3f seconds', 

2034 _format_handle(handle), dt) 

2035 finally: 

2036 self._current_handle = None 

2037 else: 

2038 handle._run() 

2039 handle = None # Needed to break cycles when an exception occurs. 

2040 

2041 def _set_coroutine_origin_tracking(self, enabled): 

2042 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled): 

2043 return 

2044 

2045 if enabled: 

2046 self._coroutine_origin_tracking_saved_depth = ( 

2047 sys.get_coroutine_origin_tracking_depth()) 

2048 sys.set_coroutine_origin_tracking_depth( 

2049 constants.DEBUG_STACK_DEPTH) 

2050 else: 

2051 sys.set_coroutine_origin_tracking_depth( 

2052 self._coroutine_origin_tracking_saved_depth) 

2053 

2054 self._coroutine_origin_tracking_enabled = enabled 

2055 

2056 def get_debug(self): 

2057 return self._debug 

2058 

2059 def set_debug(self, enabled): 

2060 self._debug = enabled 

2061 

2062 if self.is_running(): 

2063 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)