Coverage for Lib/asyncio/events.py: 92%

336 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-15 02:02 +0000

1"""Event loop and event loop policy.""" 

2 

3# Contains code from https://github.com/MagicStack/uvloop/tree/v0.16.0 

4# SPDX-License-Identifier: PSF-2.0 AND (MIT OR Apache-2.0) 

5# SPDX-FileCopyrightText: Copyright (c) 2015-2021 MagicStack Inc. http://magic.io 

6 

7__all__ = ( 

8 "_AbstractEventLoopPolicy", 

9 "AbstractEventLoop", 

10 "AbstractServer", 

11 "Handle", 

12 "TimerHandle", 

13 "_get_event_loop_policy", 

14 "get_event_loop_policy", 

15 "_set_event_loop_policy", 

16 "set_event_loop_policy", 

17 "get_event_loop", 

18 "set_event_loop", 

19 "new_event_loop", 

20 "_set_running_loop", 

21 "get_running_loop", 

22 "_get_running_loop", 

23) 

24 

25import contextvars 

26import os 

27import signal 

28import socket 

29import subprocess 

30import sys 

31import threading 

32import warnings 

33 

34from . import format_helpers 

35 

36 

37class Handle: 

38 """Object returned by callback registration methods.""" 

39 

40 __slots__ = ('_callback', '_args', '_cancelled', '_loop', 

41 '_source_traceback', '_repr', '__weakref__', 

42 '_context') 

43 

44 def __init__(self, callback, args, loop, context=None): 

45 if context is None: 

46 context = contextvars.copy_context() 

47 self._context = context 

48 self._loop = loop 

49 self._callback = callback 

50 self._args = args 

51 self._cancelled = False 

52 self._repr = None 

53 if self._loop.get_debug(): 

54 self._source_traceback = format_helpers.extract_stack( 

55 sys._getframe(1)) 

56 else: 

57 self._source_traceback = None 

58 

59 def _repr_info(self): 

60 info = [self.__class__.__name__] 

61 if self._cancelled: 

62 info.append('cancelled') 

63 if self._callback is not None: 

64 info.append(format_helpers._format_callback_source( 

65 self._callback, self._args, 

66 debug=self._loop.get_debug())) 

67 if self._source_traceback: 

68 frame = self._source_traceback[-1] 

69 info.append(f'created at {frame[0]}:{frame[1]}') 

70 return info 

71 

72 def __repr__(self): 

73 if self._repr is not None: 

74 return self._repr 

75 info = self._repr_info() 

76 return '<{}>'.format(' '.join(info)) 

77 

78 def get_context(self): 

79 return self._context 

80 

81 def cancel(self): 

82 if not self._cancelled: 

83 self._cancelled = True 

84 if self._loop.get_debug(): 

85 # Keep a representation in debug mode to keep callback and 

86 # parameters. For example, to log the warning 

87 # "Executing <Handle...> took 2.5 second" 

88 self._repr = repr(self) 

89 self._callback = None 

90 self._args = None 

91 

92 def cancelled(self): 

93 return self._cancelled 

94 

95 def _run(self): 

96 try: 

97 self._context.run(self._callback, *self._args) 

98 except (SystemExit, KeyboardInterrupt): 

99 raise 

100 except BaseException as exc: 

101 cb = format_helpers._format_callback_source( 

102 self._callback, self._args, 

103 debug=self._loop.get_debug()) 

104 msg = f'Exception in callback {cb}' 

105 context = { 

106 'message': msg, 

107 'exception': exc, 

108 'handle': self, 

109 } 

110 if self._source_traceback: 

111 context['source_traceback'] = self._source_traceback 

112 self._loop.call_exception_handler(context) 

113 self = None # Needed to break cycles when an exception occurs. 

114 

115# _ThreadSafeHandle is used for callbacks scheduled with call_soon_threadsafe 

116# and is thread safe unlike Handle which is not thread safe. 

117class _ThreadSafeHandle(Handle): 

118 

119 __slots__ = ('_lock',) 

120 

121 def __init__(self, callback, args, loop, context=None): 

122 super().__init__(callback, args, loop, context) 

123 self._lock = threading.RLock() 

124 

125 def cancel(self): 

126 with self._lock: 

127 return super().cancel() 

128 

129 def cancelled(self): 

130 with self._lock: 

131 return super().cancelled() 

132 

133 def _run(self): 

134 # The event loop checks for cancellation without holding the lock 

135 # It is possible that the handle is cancelled after the check 

136 # but before the callback is called so check it again after acquiring 

137 # the lock and return without calling the callback if it is cancelled. 

138 with self._lock: 

139 if self._cancelled: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true

140 return 

141 return super()._run() 

142 

143 

144class TimerHandle(Handle): 

145 """Object returned by timed callback registration methods.""" 

146 

147 __slots__ = ['_scheduled', '_when'] 

148 

149 def __init__(self, when, callback, args, loop, context=None): 

150 super().__init__(callback, args, loop, context) 

151 if self._source_traceback: 

152 del self._source_traceback[-1] 

153 self._when = when 

154 self._scheduled = False 

155 

156 def _repr_info(self): 

157 info = super()._repr_info() 

158 pos = 2 if self._cancelled else 1 

159 info.insert(pos, f'when={self._when}') 

160 return info 

161 

162 def __hash__(self): 

163 return hash(self._when) 

164 

165 def __lt__(self, other): 

166 if isinstance(other, TimerHandle): 

167 return self._when < other._when 

168 return NotImplemented 

169 

170 def __le__(self, other): 

171 if isinstance(other, TimerHandle): 

172 return self._when < other._when or self.__eq__(other) 

173 return NotImplemented 

174 

175 def __gt__(self, other): 

176 if isinstance(other, TimerHandle): 

177 return self._when > other._when 

178 return NotImplemented 

179 

180 def __ge__(self, other): 

181 if isinstance(other, TimerHandle): 

182 return self._when > other._when or self.__eq__(other) 

183 return NotImplemented 

184 

185 def __eq__(self, other): 

186 if isinstance(other, TimerHandle): 

187 return (self._when == other._when and 

188 self._callback == other._callback and 

189 self._args == other._args and 

190 self._cancelled == other._cancelled) 

191 return NotImplemented 

192 

193 def cancel(self): 

194 if not self._cancelled: 194 ↛ 196line 194 didn't jump to line 196 because the condition on line 194 was always true

195 self._loop._timer_handle_cancelled(self) 

196 super().cancel() 

197 

198 def when(self): 

199 """Return a scheduled callback time. 

200 

201 The time is an absolute timestamp, using the same time 

202 reference as loop.time(). 

203 """ 

204 return self._when 

205 

206 

207class AbstractServer: 

208 """Abstract server returned by create_server().""" 

209 

210 def close(self): 

211 """Stop serving. This leaves existing connections open.""" 

212 raise NotImplementedError 

213 

214 def close_clients(self): 

215 """Close all active connections.""" 

216 raise NotImplementedError 

217 

218 def abort_clients(self): 

219 """Close all active connections immediately.""" 

220 raise NotImplementedError 

221 

222 def get_loop(self): 

223 """Get the event loop the Server object is attached to.""" 

224 raise NotImplementedError 

225 

226 def is_serving(self): 

227 """Return True if the server is accepting connections.""" 

228 raise NotImplementedError 

229 

230 async def start_serving(self): 

231 """Start accepting connections. 

232 

233 This method is idempotent, so it can be called when 

234 the server is already being serving. 

235 """ 

236 raise NotImplementedError 

237 

238 async def serve_forever(self): 

239 """Start accepting connections until the coroutine is cancelled. 

240 

241 The server is closed when the coroutine is cancelled. 

242 """ 

243 raise NotImplementedError 

244 

245 async def wait_closed(self): 

246 """Coroutine to wait until service is closed.""" 

247 raise NotImplementedError 

248 

249 async def __aenter__(self): 

250 return self 

251 

252 async def __aexit__(self, *exc): 

253 self.close() 

254 await self.wait_closed() 

255 

256 

257class AbstractEventLoop: 

258 """Abstract event loop.""" 

259 

260 # Running and stopping the event loop. 

261 

262 def run_forever(self): 

263 """Run the event loop until stop() is called.""" 

264 raise NotImplementedError 

265 

266 def run_until_complete(self, future): 

267 """Run the event loop until a Future is done. 

268 

269 Return the Future's result, or raise its exception. 

270 """ 

271 raise NotImplementedError 

272 

273 def stop(self): 

274 """Stop the event loop as soon as reasonable. 

275 

276 Exactly how soon that is may depend on the implementation, but 

277 no more I/O callbacks should be scheduled. 

278 """ 

279 raise NotImplementedError 

280 

281 def is_running(self): 

282 """Return whether the event loop is currently running.""" 

283 raise NotImplementedError 

284 

285 def is_closed(self): 

286 """Returns True if the event loop was closed.""" 

287 raise NotImplementedError 

288 

289 def close(self): 

290 """Close the loop. 

291 

292 The loop should not be running. 

293 

294 This is idempotent and irreversible. 

295 

296 No other methods should be called after this one. 

297 """ 

298 raise NotImplementedError 

299 

300 async def shutdown_asyncgens(self): 

301 """Shutdown all active asynchronous generators.""" 

302 raise NotImplementedError 

303 

304 async def shutdown_default_executor(self): 

305 """Schedule the shutdown of the default executor.""" 

306 raise NotImplementedError 

307 

308 # Methods scheduling callbacks. All these return Handles. 

309 

310 def _timer_handle_cancelled(self, handle): 

311 """Notification that a TimerHandle has been cancelled.""" 

312 raise NotImplementedError 

313 

314 def call_soon(self, callback, *args, context=None): 

315 return self.call_later(0, callback, *args, context=context) 

316 

317 def call_later(self, delay, callback, *args, context=None): 

318 raise NotImplementedError 

319 

320 def call_at(self, when, callback, *args, context=None): 

321 raise NotImplementedError 

322 

323 def time(self): 

324 raise NotImplementedError 

325 

326 def create_future(self): 

327 raise NotImplementedError 

328 

329 # Method scheduling a coroutine object: create a task. 

330 

331 def create_task(self, coro, **kwargs): 

332 raise NotImplementedError 

333 

334 # Methods for interacting with threads. 

335 

336 def call_soon_threadsafe(self, callback, *args, context=None): 

337 raise NotImplementedError 

338 

339 def run_in_executor(self, executor, func, *args): 

340 raise NotImplementedError 

341 

342 def set_default_executor(self, executor): 

343 raise NotImplementedError 

344 

345 # Network I/O methods returning Futures. 

346 

347 async def getaddrinfo(self, host, port, *, 

348 family=0, type=0, proto=0, flags=0): 

349 raise NotImplementedError 

350 

351 async def getnameinfo(self, sockaddr, flags=0): 

352 raise NotImplementedError 

353 

354 async def create_connection( 

355 self, protocol_factory, host=None, port=None, 

356 *, ssl=None, family=0, proto=0, 

357 flags=0, sock=None, local_addr=None, 

358 server_hostname=None, 

359 ssl_handshake_timeout=None, 

360 ssl_shutdown_timeout=None, 

361 happy_eyeballs_delay=None, interleave=None): 

362 raise NotImplementedError 

363 

364 async def create_server( 

365 self, protocol_factory, host=None, port=None, 

366 *, family=socket.AF_UNSPEC, 

367 flags=socket.AI_PASSIVE, sock=None, backlog=100, 

368 ssl=None, reuse_address=None, reuse_port=None, 

369 keep_alive=None, 

370 ssl_handshake_timeout=None, 

371 ssl_shutdown_timeout=None, 

372 start_serving=True): 

373 """A coroutine which creates a TCP server bound to host and port. 

374 

375 The return value is a Server object which can be used to stop 

376 the service. 

377 

378 If host is an empty string or None all interfaces are assumed 

379 and a list of multiple sockets will be returned (most likely 

380 one for IPv4 and another one for IPv6). The host parameter can also be 

381 a sequence (e.g. list) of hosts to bind to. 

382 

383 family can be set to either AF_INET or AF_INET6 to force the 

384 socket to use IPv4 or IPv6. If not set it will be determined 

385 from host (defaults to AF_UNSPEC). 

386 

387 flags is a bitmask for getaddrinfo(). 

388 

389 sock can optionally be specified in order to use a preexisting 

390 socket object. 

391 

392 backlog is the maximum number of queued connections passed to 

393 listen() (defaults to 100). 

394 

395 ssl can be set to an SSLContext to enable SSL over the 

396 accepted connections. 

397 

398 reuse_address tells the kernel to reuse a local socket in 

399 TIME_WAIT state, without waiting for its natural timeout to 

400 expire. If not specified will automatically be set to True on 

401 UNIX. 

402 

403 reuse_port tells the kernel to allow this endpoint to be bound to 

404 the same port as other existing endpoints are bound to, so long as 

405 they all set this flag when being created. This option is not 

406 supported on Windows. 

407 

408 keep_alive set to True keeps connections active by enabling the 

409 periodic transmission of messages. 

410 

411 ssl_handshake_timeout is the time in seconds that an SSL server 

412 will wait for completion of the SSL handshake before aborting the 

413 connection. Default is 60s. 

414 

415 ssl_shutdown_timeout is the time in seconds that an SSL server 

416 will wait for completion of the SSL shutdown procedure 

417 before aborting the connection. Default is 30s. 

418 

419 start_serving set to True (default) causes the created server 

420 to start accepting connections immediately. When set to False, 

421 the user should await Server.start_serving() or Server.serve_forever() 

422 to make the server to start accepting connections. 

423 """ 

424 raise NotImplementedError 

425 

426 async def sendfile(self, transport, file, offset=0, count=None, 

427 *, fallback=True): 

428 """Send a file through a transport. 

429 

430 Return an amount of sent bytes. 

431 """ 

432 raise NotImplementedError 

433 

434 async def start_tls(self, transport, protocol, sslcontext, *, 

435 server_side=False, 

436 server_hostname=None, 

437 ssl_handshake_timeout=None, 

438 ssl_shutdown_timeout=None): 

439 """Upgrade a transport to TLS. 

440 

441 Return a new transport that *protocol* should start using 

442 immediately. 

443 """ 

444 raise NotImplementedError 

445 

446 async def create_unix_connection( 

447 self, protocol_factory, path=None, *, 

448 ssl=None, sock=None, 

449 server_hostname=None, 

450 ssl_handshake_timeout=None, 

451 ssl_shutdown_timeout=None): 

452 raise NotImplementedError 

453 

454 async def create_unix_server( 

455 self, protocol_factory, path=None, *, 

456 sock=None, backlog=100, ssl=None, 

457 ssl_handshake_timeout=None, 

458 ssl_shutdown_timeout=None, 

459 start_serving=True): 

460 """A coroutine which creates a UNIX Domain Socket server. 

461 

462 The return value is a Server object, which can be used to stop 

463 the service. 

464 

465 path is a str, representing a file system path to bind the 

466 server socket to. 

467 

468 sock can optionally be specified in order to use a preexisting 

469 socket object. 

470 

471 backlog is the maximum number of queued connections passed to 

472 listen() (defaults to 100). 

473 

474 ssl can be set to an SSLContext to enable SSL over the 

475 accepted connections. 

476 

477 ssl_handshake_timeout is the time in seconds that an SSL server 

478 will wait for the SSL handshake to complete (defaults to 60s). 

479 

480 ssl_shutdown_timeout is the time in seconds that an SSL server 

481 will wait for the SSL shutdown to finish (defaults to 30s). 

482 

483 start_serving set to True (default) causes the created server 

484 to start accepting connections immediately. When set to False, 

485 the user should await Server.start_serving() or Server.serve_forever() 

486 to make the server to start accepting connections. 

487 """ 

488 raise NotImplementedError 

489 

490 async def connect_accepted_socket( 

491 self, protocol_factory, sock, 

492 *, ssl=None, 

493 ssl_handshake_timeout=None, 

494 ssl_shutdown_timeout=None): 

495 """Handle an accepted connection. 

496 

497 This is used by servers that accept connections outside of 

498 asyncio, but use asyncio to handle connections. 

499 

500 This method is a coroutine. When completed, the coroutine 

501 returns a (transport, protocol) pair. 

502 """ 

503 raise NotImplementedError 

504 

505 async def create_datagram_endpoint(self, protocol_factory, 

506 local_addr=None, remote_addr=None, *, 

507 family=0, proto=0, flags=0, 

508 reuse_address=None, reuse_port=None, 

509 allow_broadcast=None, sock=None): 

510 """A coroutine which creates a datagram endpoint. 

511 

512 This method will try to establish the endpoint in the background. 

513 When successful, the coroutine returns a (transport, protocol) pair. 

514 

515 protocol_factory must be a callable returning a protocol instance. 

516 

517 socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on 

518 host (or family if specified), socket type SOCK_DGRAM. 

519 

520 reuse_address tells the kernel to reuse a local socket in 

521 TIME_WAIT state, without waiting for its natural timeout to 

522 expire. If not specified it will automatically be set to True on 

523 UNIX. 

524 

525 reuse_port tells the kernel to allow this endpoint to be bound to 

526 the same port as other existing endpoints are bound to, so long as 

527 they all set this flag when being created. This option is not 

528 supported on Windows and some UNIX's. If the 

529 :py:data:`~socket.SO_REUSEPORT` constant is not defined then this 

530 capability is unsupported. 

531 

532 allow_broadcast tells the kernel to allow this endpoint to send 

533 messages to the broadcast address. 

534 

535 sock can optionally be specified in order to use a preexisting 

536 socket object. 

537 """ 

538 raise NotImplementedError 

539 

540 # Pipes and subprocesses. 

541 

542 async def connect_read_pipe(self, protocol_factory, pipe): 

543 """Register read pipe in event loop. Set the pipe to non-blocking mode. 

544 

545 protocol_factory should instantiate object with Protocol interface. 

546 pipe is a file-like object. 

547 Return pair (transport, protocol), where transport supports the 

548 ReadTransport interface.""" 

549 # The reason to accept file-like object instead of just file descriptor 

550 # is: we need to own pipe and close it at transport finishing 

551 # Can got complicated errors if pass f.fileno(), 

552 # close fd in pipe transport then close f and vice versa. 

553 raise NotImplementedError 

554 

555 async def connect_write_pipe(self, protocol_factory, pipe): 

556 """Register write pipe in event loop. 

557 

558 protocol_factory should instantiate object with BaseProtocol interface. 

559 Pipe is file-like object already switched to nonblocking. 

560 Return pair (transport, protocol), where transport support 

561 WriteTransport interface.""" 

562 # The reason to accept file-like object instead of just file descriptor 

563 # is: we need to own pipe and close it at transport finishing 

564 # Can got complicated errors if pass f.fileno(), 

565 # close fd in pipe transport then close f and vice versa. 

566 raise NotImplementedError 

567 

568 async def subprocess_shell(self, protocol_factory, cmd, *, 

569 stdin=subprocess.PIPE, 

570 stdout=subprocess.PIPE, 

571 stderr=subprocess.PIPE, 

572 **kwargs): 

573 raise NotImplementedError 

574 

575 async def subprocess_exec(self, protocol_factory, *args, 

576 stdin=subprocess.PIPE, 

577 stdout=subprocess.PIPE, 

578 stderr=subprocess.PIPE, 

579 **kwargs): 

580 raise NotImplementedError 

581 

582 # Ready-based callback registration methods. 

583 # The add_*() methods return None. 

584 # The remove_*() methods return True if something was removed, 

585 # False if there was nothing to delete. 

586 

587 def add_reader(self, fd, callback, *args): 

588 raise NotImplementedError 

589 

590 def remove_reader(self, fd): 

591 raise NotImplementedError 

592 

593 def add_writer(self, fd, callback, *args): 

594 raise NotImplementedError 

595 

596 def remove_writer(self, fd): 

597 raise NotImplementedError 

598 

599 # Completion based I/O methods returning Futures. 

600 

601 async def sock_recv(self, sock, nbytes): 

602 raise NotImplementedError 

603 

604 async def sock_recv_into(self, sock, buf): 

605 raise NotImplementedError 

606 

607 async def sock_recvfrom(self, sock, bufsize): 

608 raise NotImplementedError 

609 

610 async def sock_recvfrom_into(self, sock, buf, nbytes=0): 

611 raise NotImplementedError 

612 

613 async def sock_sendall(self, sock, data): 

614 raise NotImplementedError 

615 

616 async def sock_sendto(self, sock, data, address): 

617 raise NotImplementedError 

618 

619 async def sock_connect(self, sock, address): 

620 raise NotImplementedError 

621 

622 async def sock_accept(self, sock): 

623 raise NotImplementedError 

624 

625 async def sock_sendfile(self, sock, file, offset=0, count=None, 

626 *, fallback=None): 

627 raise NotImplementedError 

628 

629 # Signal handling. 

630 

631 def add_signal_handler(self, sig, callback, *args): 

632 raise NotImplementedError 

633 

634 def remove_signal_handler(self, sig): 

635 raise NotImplementedError 

636 

637 # Task factory. 

638 

639 def set_task_factory(self, factory): 

640 raise NotImplementedError 

641 

642 def get_task_factory(self): 

643 raise NotImplementedError 

644 

645 # Error handlers. 

646 

647 def get_exception_handler(self): 

648 raise NotImplementedError 

649 

650 def set_exception_handler(self, handler): 

651 raise NotImplementedError 

652 

653 def default_exception_handler(self, context): 

654 raise NotImplementedError 

655 

656 def call_exception_handler(self, context): 

657 raise NotImplementedError 

658 

659 # Debug flag management. 

660 

661 def get_debug(self): 

662 raise NotImplementedError 

663 

664 def set_debug(self, enabled): 

665 raise NotImplementedError 

666 

667 

668class _AbstractEventLoopPolicy: 

669 """Abstract policy for accessing the event loop.""" 

670 

671 def get_event_loop(self): 

672 """Get the event loop for the current context. 

673 

674 Returns an event loop object implementing the AbstractEventLoop interface, 

675 or raises an exception in case no event loop has been set for the 

676 current context and the current policy does not specify to create one. 

677 

678 It should never return None.""" 

679 raise NotImplementedError 

680 

681 def set_event_loop(self, loop): 

682 """Set the event loop for the current context to loop.""" 

683 raise NotImplementedError 

684 

685 def new_event_loop(self): 

686 """Create and return a new event loop object according to this 

687 policy's rules. If there's need to set this loop as the event loop for 

688 the current context, set_event_loop must be called explicitly.""" 

689 raise NotImplementedError 

690 

691class _BaseDefaultEventLoopPolicy(_AbstractEventLoopPolicy): 

692 """Default policy implementation for accessing the event loop. 

693 

694 In this policy, each thread has its own event loop. However, we 

695 only automatically create an event loop by default for the main 

696 thread; other threads by default have no event loop. 

697 

698 Other policies may have different rules (e.g. a single global 

699 event loop, or automatically creating an event loop per thread, or 

700 using some other notion of context to which an event loop is 

701 associated). 

702 """ 

703 

704 _loop_factory = None 

705 

706 class _Local(threading.local): 

707 _loop = None 

708 

709 def __init__(self): 

710 self._local = self._Local() 

711 

712 def get_event_loop(self): 

713 """Get the event loop for the current context. 

714 

715 Returns an instance of EventLoop or raises an exception. 

716 """ 

717 if self._local._loop is None: 

718 raise RuntimeError('There is no current event loop in thread %r.' 

719 % threading.current_thread().name) 

720 

721 return self._local._loop 

722 

723 def set_event_loop(self, loop): 

724 """Set the event loop.""" 

725 if loop is not None and not isinstance(loop, AbstractEventLoop): 

726 raise TypeError(f"loop must be an instance of AbstractEventLoop or None, not '{type(loop).__name__}'") 

727 self._local._loop = loop 

728 

729 def new_event_loop(self): 

730 """Create a new event loop. 

731 

732 You must call set_event_loop() to make this the current event 

733 loop. 

734 """ 

735 return self._loop_factory() 

736 

737 

738# Event loop policy. The policy itself is always global, even if the 

739# policy's rules say that there is an event loop per thread (or other 

740# notion of context). The default policy is installed by the first 

741# call to get_event_loop_policy(). 

742_event_loop_policy = None 

743 

744# Lock for protecting the on-the-fly creation of the event loop policy. 

745_lock = threading.Lock() 

746 

747 

748# A TLS for the running event loop, used by _get_running_loop. 

749class _RunningLoop(threading.local): 

750 loop_pid = (None, None) 

751 

752 

753_running_loop = _RunningLoop() 

754 

755 

756def get_running_loop(): 

757 """Return the running event loop. Raise a RuntimeError if there is none. 

758 

759 This function is thread-specific. 

760 """ 

761 # NOTE: this function is implemented in C (see _asynciomodule.c) 

762 loop = _get_running_loop() 

763 if loop is None: 

764 raise RuntimeError('no running event loop') 

765 return loop 

766 

767 

768def _get_running_loop(): 

769 """Return the running event loop or None. 

770 

771 This is a low-level function intended to be used by event loops. 

772 This function is thread-specific. 

773 """ 

774 # NOTE: this function is implemented in C (see _asynciomodule.c) 

775 running_loop, pid = _running_loop.loop_pid 

776 if running_loop is not None and pid == os.getpid(): 

777 return running_loop 

778 

779 

780def _set_running_loop(loop): 

781 """Set the running event loop. 

782 

783 This is a low-level function intended to be used by event loops. 

784 This function is thread-specific. 

785 """ 

786 # NOTE: this function is implemented in C (see _asynciomodule.c) 

787 _running_loop.loop_pid = (loop, os.getpid()) 

788 

789 

790def _init_event_loop_policy(): 

791 global _event_loop_policy 

792 with _lock: 

793 if _event_loop_policy is None: # pragma: no branch 

794 from . import _DefaultEventLoopPolicy 

795 _event_loop_policy = _DefaultEventLoopPolicy() 

796 

797 

798def _get_event_loop_policy(): 

799 """Get the current event loop policy.""" 

800 if _event_loop_policy is None: 

801 _init_event_loop_policy() 

802 return _event_loop_policy 

803 

804def get_event_loop_policy(): 

805 warnings._deprecated('asyncio.get_event_loop_policy', remove=(3, 16)) 

806 return _get_event_loop_policy() 

807 

808def _set_event_loop_policy(policy): 

809 """Set the current event loop policy. 

810 

811 If policy is None, the default policy is restored.""" 

812 global _event_loop_policy 

813 if policy is not None and not isinstance(policy, _AbstractEventLoopPolicy): 

814 raise TypeError(f"policy must be an instance of AbstractEventLoopPolicy or None, not '{type(policy).__name__}'") 

815 _event_loop_policy = policy 

816 

817def set_event_loop_policy(policy): 

818 warnings._deprecated('asyncio.set_event_loop_policy', remove=(3,16)) 

819 _set_event_loop_policy(policy) 

820 

821def get_event_loop(): 

822 """Return an asyncio event loop. 

823 

824 When called from a coroutine or a callback (e.g. scheduled with call_soon 

825 or similar API), this function will always return the running event loop. 

826 

827 If there is no running event loop set, the function will return 

828 the result of `get_event_loop_policy().get_event_loop()` call. 

829 """ 

830 # NOTE: this function is implemented in C (see _asynciomodule.c) 

831 current_loop = _get_running_loop() 

832 if current_loop is not None: 

833 return current_loop 

834 return _get_event_loop_policy().get_event_loop() 

835 

836 

837def set_event_loop(loop): 

838 """Equivalent to calling get_event_loop_policy().set_event_loop(loop).""" 

839 _get_event_loop_policy().set_event_loop(loop) 

840 

841 

842def new_event_loop(): 

843 """Equivalent to calling get_event_loop_policy().new_event_loop().""" 

844 return _get_event_loop_policy().new_event_loop() 

845 

846 

847# Alias pure-Python implementations for testing purposes. 

848_py__get_running_loop = _get_running_loop 

849_py__set_running_loop = _set_running_loop 

850_py_get_running_loop = get_running_loop 

851_py_get_event_loop = get_event_loop 

852 

853 

854try: 

855 # get_event_loop() is one of the most frequently called 

856 # functions in asyncio. Pure Python implementation is 

857 # about 4 times slower than C-accelerated. 

858 from _asyncio import (_get_running_loop, _set_running_loop, 

859 get_running_loop, get_event_loop) 

860except ImportError: 

861 pass 

862else: 

863 # Alias C implementations for testing purposes. 

864 _c__get_running_loop = _get_running_loop 

865 _c__set_running_loop = _set_running_loop 

866 _c_get_running_loop = get_running_loop 

867 _c_get_event_loop = get_event_loop 

868 

869 

870if hasattr(os, 'fork'): 870 ↛ exitline 870 didn't exit the module because the condition on line 870 was always true

871 def on_fork(): 

872 # Reset the loop and wakeupfd in the forked child process. 

873 if _event_loop_policy is not None: 

874 _event_loop_policy._local = _BaseDefaultEventLoopPolicy._Local() 

875 _set_running_loop(None) 

876 signal.set_wakeup_fd(-1) 

877 

878 os.register_at_fork(after_in_child=on_fork)