Coverage for Lib/asyncio/events.py: 92%

336 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-24 03:28 +0000

1"""Event loop and event loop policy.""" 

2 

3# Contains code from https://github.com/MagicStack/uvloop/tree/v0.16.0 

4# SPDX-License-Identifier: PSF-2.0 AND (MIT OR Apache-2.0) 

5# SPDX-FileCopyrightText: Copyright (c) 2015-2021 MagicStack Inc. http://magic.io 

6 

7__all__ = ( 

8 "AbstractEventLoop", 

9 "AbstractServer", 

10 "Handle", 

11 "TimerHandle", 

12 "get_event_loop_policy", 

13 "set_event_loop_policy", 

14 "get_event_loop", 

15 "set_event_loop", 

16 "new_event_loop", 

17 "_set_running_loop", 

18 "get_running_loop", 

19 "_get_running_loop", 

20) 

21 

22import contextvars 

23import os 

24import signal 

25import socket 

26import subprocess 

27import sys 

28import threading 

29import warnings 

30 

31from . import format_helpers 

32 

33 

34class Handle: 

35 """Object returned by callback registration methods.""" 

36 

37 __slots__ = ('_callback', '_args', '_cancelled', '_loop', 

38 '_source_traceback', '_repr', '__weakref__', 

39 '_context') 

40 

41 def __init__(self, callback, args, loop, context=None): 

42 if context is None: 

43 context = contextvars.copy_context() 

44 self._context = context 

45 self._loop = loop 

46 self._callback = callback 

47 self._args = args 

48 self._cancelled = False 

49 self._repr = None 

50 if self._loop.get_debug(): 

51 self._source_traceback = format_helpers.extract_stack( 

52 sys._getframe(1)) 

53 else: 

54 self._source_traceback = None 

55 

56 def _repr_info(self): 

57 info = [self.__class__.__name__] 

58 if self._cancelled: 

59 info.append('cancelled') 

60 if self._callback is not None: 

61 info.append(format_helpers._format_callback_source( 

62 self._callback, self._args, 

63 debug=self._loop.get_debug())) 

64 if self._source_traceback: 

65 frame = self._source_traceback[-1] 

66 info.append(f'created at {frame[0]}:{frame[1]}') 

67 return info 

68 

69 def __repr__(self): 

70 if self._repr is not None: 

71 return self._repr 

72 info = self._repr_info() 

73 return '<{}>'.format(' '.join(info)) 

74 

75 def get_context(self): 

76 return self._context 

77 

78 def cancel(self): 

79 if not self._cancelled: 

80 self._cancelled = True 

81 if self._loop.get_debug(): 

82 # Keep a representation in debug mode to keep callback and 

83 # parameters. For example, to log the warning 

84 # "Executing <Handle...> took 2.5 second" 

85 self._repr = repr(self) 

86 self._callback = None 

87 self._args = None 

88 

89 def cancelled(self): 

90 return self._cancelled 

91 

92 def _run(self): 

93 try: 

94 self._context.run(self._callback, *self._args) 

95 except (SystemExit, KeyboardInterrupt): 

96 raise 

97 except BaseException as exc: 

98 cb = format_helpers._format_callback_source( 

99 self._callback, self._args, 

100 debug=self._loop.get_debug()) 

101 msg = f'Exception in callback {cb}' 

102 context = { 

103 'message': msg, 

104 'exception': exc, 

105 'handle': self, 

106 } 

107 if self._source_traceback: 

108 context['source_traceback'] = self._source_traceback 

109 self._loop.call_exception_handler(context) 

110 self = None # Needed to break cycles when an exception occurs. 

111 

112# _ThreadSafeHandle is used for callbacks scheduled with call_soon_threadsafe 

113# and is thread safe unlike Handle which is not thread safe. 

114class _ThreadSafeHandle(Handle): 

115 

116 __slots__ = ('_lock',) 

117 

118 def __init__(self, callback, args, loop, context=None): 

119 super().__init__(callback, args, loop, context) 

120 self._lock = threading.RLock() 

121 

122 def cancel(self): 

123 with self._lock: 

124 return super().cancel() 

125 

126 def cancelled(self): 

127 with self._lock: 

128 return super().cancelled() 

129 

130 def _run(self): 

131 # The event loop checks for cancellation without holding the lock 

132 # It is possible that the handle is cancelled after the check 

133 # but before the callback is called so check it again after acquiring 

134 # the lock and return without calling the callback if it is cancelled. 

135 with self._lock: 

136 if self._cancelled: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 return 

138 return super()._run() 

139 

140 

141class TimerHandle(Handle): 

142 """Object returned by timed callback registration methods.""" 

143 

144 __slots__ = ['_scheduled', '_when'] 

145 

146 def __init__(self, when, callback, args, loop, context=None): 

147 super().__init__(callback, args, loop, context) 

148 if self._source_traceback: 

149 del self._source_traceback[-1] 

150 self._when = when 

151 self._scheduled = False 

152 

153 def _repr_info(self): 

154 info = super()._repr_info() 

155 pos = 2 if self._cancelled else 1 

156 info.insert(pos, f'when={self._when}') 

157 return info 

158 

159 def __hash__(self): 

160 return hash(self._when) 

161 

162 def __lt__(self, other): 

163 if isinstance(other, TimerHandle): 

164 return self._when < other._when 

165 return NotImplemented 

166 

167 def __le__(self, other): 

168 if isinstance(other, TimerHandle): 

169 return self._when < other._when or self.__eq__(other) 

170 return NotImplemented 

171 

172 def __gt__(self, other): 

173 if isinstance(other, TimerHandle): 

174 return self._when > other._when 

175 return NotImplemented 

176 

177 def __ge__(self, other): 

178 if isinstance(other, TimerHandle): 

179 return self._when > other._when or self.__eq__(other) 

180 return NotImplemented 

181 

182 def __eq__(self, other): 

183 if isinstance(other, TimerHandle): 

184 return (self._when == other._when and 

185 self._callback == other._callback and 

186 self._args == other._args and 

187 self._cancelled == other._cancelled) 

188 return NotImplemented 

189 

190 def cancel(self): 

191 if not self._cancelled: 191 ↛ 193line 191 didn't jump to line 193 because the condition on line 191 was always true

192 self._loop._timer_handle_cancelled(self) 

193 super().cancel() 

194 

195 def when(self): 

196 """Return a scheduled callback time. 

197 

198 The time is an absolute timestamp, using the same time 

199 reference as loop.time(). 

200 """ 

201 return self._when 

202 

203 

204class AbstractServer: 

205 """Abstract server returned by create_server().""" 

206 

207 def close(self): 

208 """Stop serving. This leaves existing connections open.""" 

209 raise NotImplementedError 

210 

211 def close_clients(self): 

212 """Close all active connections.""" 

213 raise NotImplementedError 

214 

215 def abort_clients(self): 

216 """Close all active connections immediately.""" 

217 raise NotImplementedError 

218 

219 def get_loop(self): 

220 """Get the event loop the Server object is attached to.""" 

221 raise NotImplementedError 

222 

223 def is_serving(self): 

224 """Return True if the server is accepting connections.""" 

225 raise NotImplementedError 

226 

227 async def start_serving(self): 

228 """Start accepting connections. 

229 

230 This method is idempotent, so it can be called when 

231 the server is already being serving. 

232 """ 

233 raise NotImplementedError 

234 

235 async def serve_forever(self): 

236 """Start accepting connections until the coroutine is cancelled. 

237 

238 The server is closed when the coroutine is cancelled. 

239 """ 

240 raise NotImplementedError 

241 

242 async def wait_closed(self): 

243 """Coroutine to wait until service is closed.""" 

244 raise NotImplementedError 

245 

246 async def __aenter__(self): 

247 return self 

248 

249 async def __aexit__(self, *exc): 

250 self.close() 

251 await self.wait_closed() 

252 

253 

254class AbstractEventLoop: 

255 """Abstract event loop.""" 

256 

257 # Running and stopping the event loop. 

258 

259 def run_forever(self): 

260 """Run the event loop until stop() is called.""" 

261 raise NotImplementedError 

262 

263 def run_until_complete(self, future): 

264 """Run the event loop until a Future is done. 

265 

266 Return the Future's result, or raise its exception. 

267 """ 

268 raise NotImplementedError 

269 

270 def stop(self): 

271 """Stop the event loop as soon as reasonable. 

272 

273 Exactly how soon that is may depend on the implementation, but 

274 no more I/O callbacks should be scheduled. 

275 """ 

276 raise NotImplementedError 

277 

278 def is_running(self): 

279 """Return whether the event loop is currently running.""" 

280 raise NotImplementedError 

281 

282 def is_closed(self): 

283 """Returns True if the event loop was closed.""" 

284 raise NotImplementedError 

285 

286 def close(self): 

287 """Close the loop. 

288 

289 The loop should not be running. 

290 

291 This is idempotent and irreversible. 

292 

293 No other methods should be called after this one. 

294 """ 

295 raise NotImplementedError 

296 

297 async def shutdown_asyncgens(self): 

298 """Shutdown all active asynchronous generators.""" 

299 raise NotImplementedError 

300 

301 async def shutdown_default_executor(self): 

302 """Schedule the shutdown of the default executor.""" 

303 raise NotImplementedError 

304 

305 # Methods scheduling callbacks. All these return Handles. 

306 

307 def _timer_handle_cancelled(self, handle): 

308 """Notification that a TimerHandle has been cancelled.""" 

309 raise NotImplementedError 

310 

311 def call_soon(self, callback, *args, context=None): 

312 return self.call_later(0, callback, *args, context=context) 

313 

314 def call_later(self, delay, callback, *args, context=None): 

315 raise NotImplementedError 

316 

317 def call_at(self, when, callback, *args, context=None): 

318 raise NotImplementedError 

319 

320 def time(self): 

321 raise NotImplementedError 

322 

323 def create_future(self): 

324 raise NotImplementedError 

325 

326 # Method scheduling a coroutine object: create a task. 

327 

328 def create_task(self, coro, **kwargs): 

329 raise NotImplementedError 

330 

331 # Methods for interacting with threads. 

332 

333 def call_soon_threadsafe(self, callback, *args, context=None): 

334 raise NotImplementedError 

335 

336 def run_in_executor(self, executor, func, *args): 

337 raise NotImplementedError 

338 

339 def set_default_executor(self, executor): 

340 raise NotImplementedError 

341 

342 # Network I/O methods returning Futures. 

343 

344 async def getaddrinfo(self, host, port, *, 

345 family=0, type=0, proto=0, flags=0): 

346 raise NotImplementedError 

347 

348 async def getnameinfo(self, sockaddr, flags=0): 

349 raise NotImplementedError 

350 

351 async def create_connection( 

352 self, protocol_factory, host=None, port=None, 

353 *, ssl=None, family=0, proto=0, 

354 flags=0, sock=None, local_addr=None, 

355 server_hostname=None, 

356 ssl_handshake_timeout=None, 

357 ssl_shutdown_timeout=None, 

358 happy_eyeballs_delay=None, interleave=None): 

359 raise NotImplementedError 

360 

361 async def create_server( 

362 self, protocol_factory, host=None, port=None, 

363 *, family=socket.AF_UNSPEC, 

364 flags=socket.AI_PASSIVE, sock=None, backlog=100, 

365 ssl=None, reuse_address=None, reuse_port=None, 

366 keep_alive=None, 

367 ssl_handshake_timeout=None, 

368 ssl_shutdown_timeout=None, 

369 start_serving=True): 

370 """A coroutine which creates a TCP server bound to host and port. 

371 

372 The return value is a Server object which can be used to stop 

373 the service. 

374 

375 If host is an empty string or None all interfaces are assumed 

376 and a list of multiple sockets will be returned (most likely 

377 one for IPv4 and another one for IPv6). The host parameter can also 

378 be a sequence (e.g. list) of hosts to bind to. 

379 

380 family can be set to either AF_INET or AF_INET6 to force the 

381 socket to use IPv4 or IPv6. If not set it will be determined 

382 from host (defaults to AF_UNSPEC). 

383 

384 flags is a bitmask for getaddrinfo(). 

385 

386 sock can optionally be specified in order to use a preexisting 

387 socket object. 

388 

389 backlog is the maximum number of queued connections passed to 

390 listen() (defaults to 100). 

391 

392 ssl can be set to an SSLContext to enable SSL over the 

393 accepted connections. 

394 

395 reuse_address tells the kernel to reuse a local socket in 

396 TIME_WAIT state, without waiting for its natural timeout to 

397 expire. If not specified will automatically be set to True on 

398 UNIX. 

399 

400 reuse_port tells the kernel to allow this endpoint to be bound to 

401 the same port as other existing endpoints are bound to, so long as 

402 they all set this flag when being created. This option is not 

403 supported on Windows. 

404 

405 keep_alive set to True keeps connections active by enabling the 

406 periodic transmission of messages. 

407 

408 ssl_handshake_timeout is the time in seconds that an SSL server 

409 will wait for completion of the SSL handshake before aborting the 

410 connection. Default is 60s. 

411 

412 ssl_shutdown_timeout is the time in seconds that an SSL server 

413 will wait for completion of the SSL shutdown procedure 

414 before aborting the connection. Default is 30s. 

415 

416 start_serving set to True (default) causes the created server 

417 to start accepting connections immediately. When set to False, 

418 the user should await Server.start_serving() or 

419 Server.serve_forever() to make the server to start accepting 

420 connections. 

421 """ 

422 raise NotImplementedError 

423 

424 async def sendfile(self, transport, file, offset=0, count=None, 

425 *, fallback=True): 

426 """Send a file through a transport. 

427 

428 Return an amount of sent bytes. 

429 """ 

430 raise NotImplementedError 

431 

432 async def start_tls(self, transport, protocol, sslcontext, *, 

433 server_side=False, 

434 server_hostname=None, 

435 ssl_handshake_timeout=None, 

436 ssl_shutdown_timeout=None): 

437 """Upgrade a transport to TLS. 

438 

439 Return a new transport that *protocol* should start using 

440 immediately. 

441 """ 

442 raise NotImplementedError 

443 

444 async def create_unix_connection( 

445 self, protocol_factory, path=None, *, 

446 ssl=None, sock=None, 

447 server_hostname=None, 

448 ssl_handshake_timeout=None, 

449 ssl_shutdown_timeout=None): 

450 raise NotImplementedError 

451 

452 async def create_unix_server( 

453 self, protocol_factory, path=None, *, 

454 sock=None, backlog=100, ssl=None, 

455 ssl_handshake_timeout=None, 

456 ssl_shutdown_timeout=None, 

457 start_serving=True): 

458 """A coroutine which creates a UNIX Domain Socket server. 

459 

460 The return value is a Server object, which can be used to stop 

461 the service. 

462 

463 path is a str, representing a file system path to bind the 

464 server socket to. 

465 

466 sock can optionally be specified in order to use a preexisting 

467 socket object. 

468 

469 backlog is the maximum number of queued connections passed to 

470 listen() (defaults to 100). 

471 

472 ssl can be set to an SSLContext to enable SSL over the 

473 accepted connections. 

474 

475 ssl_handshake_timeout is the time in seconds that an SSL server 

476 will wait for the SSL handshake to complete (defaults to 60s). 

477 

478 ssl_shutdown_timeout is the time in seconds that an SSL server 

479 will wait for the SSL shutdown to finish (defaults to 30s). 

480 

481 start_serving set to True (default) causes the created server 

482 to start accepting connections immediately. When set to False, 

483 the user should await Server.start_serving() or 

484 Server.serve_forever() to make the server to start accepting 

485 connections. 

486 """ 

487 raise NotImplementedError 

488 

489 async def connect_accepted_socket( 

490 self, protocol_factory, sock, 

491 *, ssl=None, 

492 ssl_handshake_timeout=None, 

493 ssl_shutdown_timeout=None): 

494 """Handle an accepted connection. 

495 

496 This is used by servers that accept connections outside of 

497 asyncio, but use asyncio to handle connections. 

498 

499 This method is a coroutine. When completed, the coroutine 

500 returns a (transport, protocol) pair. 

501 """ 

502 raise NotImplementedError 

503 

504 async def create_datagram_endpoint(self, protocol_factory, 

505 local_addr=None, remote_addr=None, *, 

506 family=0, proto=0, flags=0, 

507 reuse_address=None, reuse_port=None, 

508 allow_broadcast=None, sock=None): 

509 """A coroutine which creates a datagram endpoint. 

510 

511 This method will try to establish the endpoint in the background. 

512 When successful, the coroutine returns a (transport, protocol) pair. 

513 

514 protocol_factory must be a callable returning a protocol instance. 

515 

516 socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending 

517 on host (or family if specified), socket type SOCK_DGRAM. 

518 

519 reuse_address tells the kernel to reuse a local socket in 

520 TIME_WAIT state, without waiting for its natural timeout to 

521 expire. If not specified it will automatically be set to True on 

522 UNIX. 

523 

524 reuse_port tells the kernel to allow this endpoint to be bound to 

525 the same port as other existing endpoints are bound to, so long as 

526 they all set this flag when being created. This option is not 

527 supported on Windows and some UNIX's. If the 

528 :py:data:`~socket.SO_REUSEPORT` constant is not defined then this 

529 capability is unsupported. 

530 

531 allow_broadcast tells the kernel to allow this endpoint to send 

532 messages to the broadcast address. 

533 

534 sock can optionally be specified in order to use a preexisting 

535 socket object. 

536 """ 

537 raise NotImplementedError 

538 

539 # Pipes and subprocesses. 

540 

541 async def connect_read_pipe(self, protocol_factory, pipe): 

542 """Register read pipe in event loop. Set the pipe to non-blocking mode. 

543 

544 protocol_factory should instantiate object with Protocol interface. 

545 pipe is a file-like object. 

546 Return pair (transport, protocol), where transport supports the 

547 ReadTransport interface.""" 

548 # The reason to accept file-like object instead of just file descriptor 

549 # is: we need to own pipe and close it at transport finishing 

550 # Can got complicated errors if pass f.fileno(), 

551 # close fd in pipe transport then close f and vice versa. 

552 raise NotImplementedError 

553 

554 async def connect_write_pipe(self, protocol_factory, pipe): 

555 """Register write pipe in event loop. 

556 

557 protocol_factory should instantiate object with BaseProtocol 

558 interface. 

559 Pipe is file-like object already switched to nonblocking. 

560 Return pair (transport, protocol), where transport support 

561 WriteTransport interface.""" 

562 # The reason to accept file-like object instead of just file descriptor 

563 # is: we need to own pipe and close it at transport finishing 

564 # Can got complicated errors if pass f.fileno(), 

565 # close fd in pipe transport then close f and vice versa. 

566 raise NotImplementedError 

567 

568 async def subprocess_shell(self, protocol_factory, cmd, *, 

569 stdin=subprocess.PIPE, 

570 stdout=subprocess.PIPE, 

571 stderr=subprocess.PIPE, 

572 **kwargs): 

573 raise NotImplementedError 

574 

575 async def subprocess_exec(self, protocol_factory, *args, 

576 stdin=subprocess.PIPE, 

577 stdout=subprocess.PIPE, 

578 stderr=subprocess.PIPE, 

579 **kwargs): 

580 raise NotImplementedError 

581 

582 # Ready-based callback registration methods. 

583 # The add_*() methods return None. 

584 # The remove_*() methods return True if something was removed, 

585 # False if there was nothing to delete. 

586 

587 def add_reader(self, fd, callback, *args): 

588 raise NotImplementedError 

589 

590 def remove_reader(self, fd): 

591 raise NotImplementedError 

592 

593 def add_writer(self, fd, callback, *args): 

594 raise NotImplementedError 

595 

596 def remove_writer(self, fd): 

597 raise NotImplementedError 

598 

599 # Completion based I/O methods returning Futures. 

600 

601 async def sock_recv(self, sock, nbytes): 

602 raise NotImplementedError 

603 

604 async def sock_recv_into(self, sock, buf): 

605 raise NotImplementedError 

606 

607 async def sock_recvfrom(self, sock, bufsize): 

608 raise NotImplementedError 

609 

610 async def sock_recvfrom_into(self, sock, buf, nbytes=0): 

611 raise NotImplementedError 

612 

613 async def sock_sendall(self, sock, data): 

614 raise NotImplementedError 

615 

616 async def sock_sendto(self, sock, data, address): 

617 raise NotImplementedError 

618 

619 async def sock_connect(self, sock, address): 

620 raise NotImplementedError 

621 

622 async def sock_accept(self, sock): 

623 raise NotImplementedError 

624 

625 async def sock_sendfile(self, sock, file, offset=0, count=None, 

626 *, fallback=None): 

627 raise NotImplementedError 

628 

629 # Signal handling. 

630 

631 def add_signal_handler(self, sig, callback, *args): 

632 raise NotImplementedError 

633 

634 def remove_signal_handler(self, sig): 

635 raise NotImplementedError 

636 

637 # Task factory. 

638 

639 def set_task_factory(self, factory): 

640 raise NotImplementedError 

641 

642 def get_task_factory(self): 

643 raise NotImplementedError 

644 

645 # Error handlers. 

646 

647 def get_exception_handler(self): 

648 raise NotImplementedError 

649 

650 def set_exception_handler(self, handler): 

651 raise NotImplementedError 

652 

653 def default_exception_handler(self, context): 

654 raise NotImplementedError 

655 

656 def call_exception_handler(self, context): 

657 raise NotImplementedError 

658 

659 # Debug flag management. 

660 

661 def get_debug(self): 

662 raise NotImplementedError 

663 

664 def set_debug(self, enabled): 

665 raise NotImplementedError 

666 

667 

668class _AbstractEventLoopPolicy: 

669 """Abstract policy for accessing the event loop.""" 

670 

671 def get_event_loop(self): 

672 """Get the event loop for the current context. 

673 

674 Returns an event loop object implementing the AbstractEventLoop interface, 

675 or raises an exception in case no event loop has been set for the 

676 current context and the current policy does not specify to create one. 

677 

678 It should never return None.""" 

679 raise NotImplementedError 

680 

681 def set_event_loop(self, loop): 

682 """Set the event loop for the current context to loop.""" 

683 raise NotImplementedError 

684 

685 def new_event_loop(self): 

686 """Create and return a new event loop object according to this 

687 policy's rules. If there's need to set this loop as the event loop for 

688 the current context, set_event_loop must be called explicitly.""" 

689 raise NotImplementedError 

690 

691class _BaseDefaultEventLoopPolicy(_AbstractEventLoopPolicy): 

692 """Default policy implementation for accessing the event loop. 

693 

694 In this policy, each thread has its own event loop. However, we 

695 only automatically create an event loop by default for the main 

696 thread; other threads by default have no event loop. 

697 

698 Other policies may have different rules (e.g. a single global 

699 event loop, or automatically creating an event loop per thread, or 

700 using some other notion of context to which an event loop is 

701 associated). 

702 """ 

703 

704 _loop_factory = None 

705 

706 class _Local(threading.local): 

707 _loop = None 

708 

709 def __init__(self): 

710 self._local = self._Local() 

711 

712 def get_event_loop(self): 

713 """Get the event loop for the current context. 

714 

715 Returns an instance of EventLoop or raises an exception. 

716 """ 

717 if self._local._loop is None: 

718 raise RuntimeError('There is no current event loop in thread %r.' 

719 % threading.current_thread().name) 

720 

721 return self._local._loop 

722 

723 def set_event_loop(self, loop): 

724 """Set the event loop.""" 

725 if loop is not None and not isinstance(loop, AbstractEventLoop): 

726 raise TypeError(f"loop must be an instance of AbstractEventLoop or None, not '{type(loop).__name__}'") 

727 self._local._loop = loop 

728 

729 def new_event_loop(self): 

730 """Create a new event loop. 

731 

732 You must call set_event_loop() to make this the current event 

733 loop. 

734 """ 

735 return self._loop_factory() 

736 

737 

738# Event loop policy. The policy itself is always global, even if the 

739# policy's rules say that there is an event loop per thread (or other 

740# notion of context). The default policy is installed by the first 

741# call to get_event_loop_policy(). 

742_event_loop_policy = None 

743 

744# Lock for protecting the on-the-fly creation of the event loop policy. 

745_lock = threading.Lock() 

746 

747 

748# A TLS for the running event loop, used by _get_running_loop. 

749class _RunningLoop(threading.local): 

750 loop_pid = (None, None) 

751 

752 

753_running_loop = _RunningLoop() 

754 

755 

756def get_running_loop(): 

757 """Return the running event loop. Raise a RuntimeError if there is none. 

758 

759 This function is thread-specific. 

760 """ 

761 # NOTE: this function is implemented in C (see _asynciomodule.c) 

762 loop = _get_running_loop() 

763 if loop is None: 

764 raise RuntimeError('no running event loop') 

765 return loop 

766 

767 

768def _get_running_loop(): 

769 """Return the running event loop or None. 

770 

771 This is a low-level function intended to be used by event loops. 

772 This function is thread-specific. 

773 """ 

774 # NOTE: this function is implemented in C (see _asynciomodule.c) 

775 running_loop, pid = _running_loop.loop_pid 

776 if running_loop is not None and pid == os.getpid(): 

777 return running_loop 

778 

779 

780def _set_running_loop(loop): 

781 """Set the running event loop. 

782 

783 This is a low-level function intended to be used by event loops. 

784 This function is thread-specific. 

785 """ 

786 # NOTE: this function is implemented in C (see _asynciomodule.c) 

787 _running_loop.loop_pid = (loop, os.getpid()) 

788 

789 

790def _init_event_loop_policy(): 

791 global _event_loop_policy 

792 with _lock: 

793 if _event_loop_policy is None: # pragma: no branch 

794 if sys.platform == 'win32': 

795 from .windows_events import _DefaultEventLoopPolicy 

796 else: 

797 from .unix_events import _DefaultEventLoopPolicy 

798 _event_loop_policy = _DefaultEventLoopPolicy() 

799 

800 

801def _get_event_loop_policy(): 

802 """Get the current event loop policy.""" 

803 if _event_loop_policy is None: 

804 _init_event_loop_policy() 

805 return _event_loop_policy 

806 

807def get_event_loop_policy(): 

808 warnings._deprecated('asyncio.get_event_loop_policy', remove=(3, 16)) 

809 return _get_event_loop_policy() 

810 

811def _set_event_loop_policy(policy): 

812 """Set the current event loop policy. 

813 

814 If policy is None, the default policy is restored.""" 

815 global _event_loop_policy 

816 if policy is not None and not isinstance(policy, _AbstractEventLoopPolicy): 

817 raise TypeError(f"policy must be an instance of AbstractEventLoopPolicy or None, not '{type(policy).__name__}'") 

818 _event_loop_policy = policy 

819 

820def set_event_loop_policy(policy): 

821 warnings._deprecated('asyncio.set_event_loop_policy', remove=(3,16)) 

822 _set_event_loop_policy(policy) 

823 

824def get_event_loop(): 

825 """Return an asyncio event loop. 

826 

827 When called from a coroutine or a callback (e.g. scheduled with call_soon 

828 or similar API), this function will always return the running event loop. 

829 

830 If there is no running event loop set, the function will return 

831 the result of `get_event_loop_policy().get_event_loop()` call. 

832 """ 

833 # NOTE: this function is implemented in C (see _asynciomodule.c) 

834 current_loop = _get_running_loop() 

835 if current_loop is not None: 

836 return current_loop 

837 return _get_event_loop_policy().get_event_loop() 

838 

839 

840def set_event_loop(loop): 

841 """Equivalent to calling get_event_loop_policy().set_event_loop(loop).""" 

842 _get_event_loop_policy().set_event_loop(loop) 

843 

844 

845def new_event_loop(): 

846 """Equivalent to calling get_event_loop_policy().new_event_loop().""" 

847 return _get_event_loop_policy().new_event_loop() 

848 

849 

850# Alias pure-Python implementations for testing purposes. 

851_py__get_running_loop = _get_running_loop 

852_py__set_running_loop = _set_running_loop 

853_py_get_running_loop = get_running_loop 

854_py_get_event_loop = get_event_loop 

855 

856 

857try: 

858 # get_event_loop() is one of the most frequently called 

859 # functions in asyncio. Pure Python implementation is 

860 # about 4 times slower than C-accelerated. 

861 from _asyncio import (_get_running_loop, _set_running_loop, 

862 get_running_loop, get_event_loop) 

863except ImportError: 

864 pass 

865else: 

866 # Alias C implementations for testing purposes. 

867 _c__get_running_loop = _get_running_loop 

868 _c__set_running_loop = _set_running_loop 

869 _c_get_running_loop = get_running_loop 

870 _c_get_event_loop = get_event_loop 

871 

872 

873if hasattr(os, 'fork'): 873 ↛ exitline 873 didn't exit the module because the condition on line 873 was always true

874 def on_fork(): 

875 # Reset the loop and wakeupfd in the forked child process. 

876 if _event_loop_policy is not None: 

877 _event_loop_policy._local = _BaseDefaultEventLoopPolicy._Local() 

878 _set_running_loop(None) 

879 signal.set_wakeup_fd(-1) 

880 

881 os.register_at_fork(after_in_child=on_fork)