Coverage for Lib/asyncio/events.py: 92%

336 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-23 01:21 +0000

1"""Event loop and event loop policy.""" 

2 

3# Contains code from https://github.com/MagicStack/uvloop/tree/v0.16.0 

4# SPDX-License-Identifier: PSF-2.0 AND (MIT OR Apache-2.0) 

5# SPDX-FileCopyrightText: Copyright (c) 2015-2021 MagicStack Inc. http://magic.io 

6 

7__all__ = ( 

8 "AbstractEventLoop", 

9 "AbstractServer", 

10 "Handle", 

11 "TimerHandle", 

12 "get_event_loop_policy", 

13 "set_event_loop_policy", 

14 "get_event_loop", 

15 "set_event_loop", 

16 "new_event_loop", 

17 "_set_running_loop", 

18 "get_running_loop", 

19 "_get_running_loop", 

20) 

21 

22import contextvars 

23import os 

24import signal 

25import socket 

26import subprocess 

27import sys 

28import threading 

29import warnings 

30 

31from . import format_helpers 

32 

33 

34class Handle: 

35 """Object returned by callback registration methods.""" 

36 

37 __slots__ = ('_callback', '_args', '_cancelled', '_loop', 

38 '_source_traceback', '_repr', '__weakref__', 

39 '_context') 

40 

41 def __init__(self, callback, args, loop, context=None): 

42 if context is None: 

43 context = contextvars.copy_context() 

44 self._context = context 

45 self._loop = loop 

46 self._callback = callback 

47 self._args = args 

48 self._cancelled = False 

49 self._repr = None 

50 if self._loop.get_debug(): 

51 self._source_traceback = format_helpers.extract_stack( 

52 sys._getframe(1)) 

53 else: 

54 self._source_traceback = None 

55 

56 def _repr_info(self): 

57 info = [self.__class__.__name__] 

58 if self._cancelled: 

59 info.append('cancelled') 

60 if self._callback is not None: 

61 info.append(format_helpers._format_callback_source( 

62 self._callback, self._args, 

63 debug=self._loop.get_debug())) 

64 if self._source_traceback: 

65 frame = self._source_traceback[-1] 

66 info.append(f'created at {frame[0]}:{frame[1]}') 

67 return info 

68 

69 def __repr__(self): 

70 if self._repr is not None: 

71 return self._repr 

72 info = self._repr_info() 

73 return '<{}>'.format(' '.join(info)) 

74 

75 def get_context(self): 

76 return self._context 

77 

78 def cancel(self): 

79 if not self._cancelled: 

80 self._cancelled = True 

81 if self._loop.get_debug(): 

82 # Keep a representation in debug mode to keep callback and 

83 # parameters. For example, to log the warning 

84 # "Executing <Handle...> took 2.5 second" 

85 self._repr = repr(self) 

86 self._callback = None 

87 self._args = None 

88 

89 def cancelled(self): 

90 return self._cancelled 

91 

92 def _run(self): 

93 try: 

94 self._context.run(self._callback, *self._args) 

95 except (SystemExit, KeyboardInterrupt): 

96 raise 

97 except BaseException as exc: 

98 cb = format_helpers._format_callback_source( 

99 self._callback, self._args, 

100 debug=self._loop.get_debug()) 

101 msg = f'Exception in callback {cb}' 

102 context = { 

103 'message': msg, 

104 'exception': exc, 

105 'handle': self, 

106 } 

107 if self._source_traceback: 

108 context['source_traceback'] = self._source_traceback 

109 self._loop.call_exception_handler(context) 

110 self = None # Needed to break cycles when an exception occurs. 

111 

112# _ThreadSafeHandle is used for callbacks scheduled with call_soon_threadsafe 

113# and is thread safe unlike Handle which is not thread safe. 

114class _ThreadSafeHandle(Handle): 

115 

116 __slots__ = ('_lock',) 

117 

118 def __init__(self, callback, args, loop, context=None): 

119 super().__init__(callback, args, loop, context) 

120 self._lock = threading.RLock() 

121 

122 def cancel(self): 

123 with self._lock: 

124 return super().cancel() 

125 

126 def cancelled(self): 

127 with self._lock: 

128 return super().cancelled() 

129 

130 def _run(self): 

131 # The event loop checks for cancellation without holding the lock 

132 # It is possible that the handle is cancelled after the check 

133 # but before the callback is called so check it again after acquiring 

134 # the lock and return without calling the callback if it is cancelled. 

135 with self._lock: 

136 if self._cancelled: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 return 

138 return super()._run() 

139 

140 

141class TimerHandle(Handle): 

142 """Object returned by timed callback registration methods.""" 

143 

144 __slots__ = ['_scheduled', '_when'] 

145 

146 def __init__(self, when, callback, args, loop, context=None): 

147 super().__init__(callback, args, loop, context) 

148 if self._source_traceback: 

149 del self._source_traceback[-1] 

150 self._when = when 

151 self._scheduled = False 

152 

153 def _repr_info(self): 

154 info = super()._repr_info() 

155 pos = 2 if self._cancelled else 1 

156 info.insert(pos, f'when={self._when}') 

157 return info 

158 

159 def __hash__(self): 

160 return hash(self._when) 

161 

162 def __lt__(self, other): 

163 if isinstance(other, TimerHandle): 

164 return self._when < other._when 

165 return NotImplemented 

166 

167 def __le__(self, other): 

168 if isinstance(other, TimerHandle): 

169 return self._when < other._when or self.__eq__(other) 

170 return NotImplemented 

171 

172 def __gt__(self, other): 

173 if isinstance(other, TimerHandle): 

174 return self._when > other._when 

175 return NotImplemented 

176 

177 def __ge__(self, other): 

178 if isinstance(other, TimerHandle): 

179 return self._when > other._when or self.__eq__(other) 

180 return NotImplemented 

181 

182 def __eq__(self, other): 

183 if isinstance(other, TimerHandle): 

184 return (self._when == other._when and 

185 self._callback == other._callback and 

186 self._args == other._args and 

187 self._cancelled == other._cancelled) 

188 return NotImplemented 

189 

190 def cancel(self): 

191 if not self._cancelled: 191 ↛ 193line 191 didn't jump to line 193 because the condition on line 191 was always true

192 self._loop._timer_handle_cancelled(self) 

193 super().cancel() 

194 

195 def when(self): 

196 """Return a scheduled callback time. 

197 

198 The time is an absolute timestamp, using the same time 

199 reference as loop.time(). 

200 """ 

201 return self._when 

202 

203 

204class AbstractServer: 

205 """Abstract server returned by create_server().""" 

206 

207 def close(self): 

208 """Stop serving. This leaves existing connections open.""" 

209 raise NotImplementedError 

210 

211 def close_clients(self): 

212 """Close all active connections.""" 

213 raise NotImplementedError 

214 

215 def abort_clients(self): 

216 """Close all active connections immediately.""" 

217 raise NotImplementedError 

218 

219 def get_loop(self): 

220 """Get the event loop the Server object is attached to.""" 

221 raise NotImplementedError 

222 

223 def is_serving(self): 

224 """Return True if the server is accepting connections.""" 

225 raise NotImplementedError 

226 

227 async def start_serving(self): 

228 """Start accepting connections. 

229 

230 This method is idempotent, so it can be called when 

231 the server is already being serving. 

232 """ 

233 raise NotImplementedError 

234 

235 async def serve_forever(self): 

236 """Start accepting connections until the coroutine is cancelled. 

237 

238 The server is closed when the coroutine is cancelled. 

239 """ 

240 raise NotImplementedError 

241 

242 async def wait_closed(self): 

243 """Coroutine to wait until service is closed.""" 

244 raise NotImplementedError 

245 

246 async def __aenter__(self): 

247 return self 

248 

249 async def __aexit__(self, *exc): 

250 self.close() 

251 await self.wait_closed() 

252 

253 

254class AbstractEventLoop: 

255 """Abstract event loop.""" 

256 

257 # Running and stopping the event loop. 

258 

259 def run_forever(self): 

260 """Run the event loop until stop() is called.""" 

261 raise NotImplementedError 

262 

263 def run_until_complete(self, future): 

264 """Run the event loop until a Future is done. 

265 

266 Return the Future's result, or raise its exception. 

267 """ 

268 raise NotImplementedError 

269 

270 def stop(self): 

271 """Stop the event loop as soon as reasonable. 

272 

273 Exactly how soon that is may depend on the implementation, but 

274 no more I/O callbacks should be scheduled. 

275 """ 

276 raise NotImplementedError 

277 

278 def is_running(self): 

279 """Return whether the event loop is currently running.""" 

280 raise NotImplementedError 

281 

282 def is_closed(self): 

283 """Returns True if the event loop was closed.""" 

284 raise NotImplementedError 

285 

286 def close(self): 

287 """Close the loop. 

288 

289 The loop should not be running. 

290 

291 This is idempotent and irreversible. 

292 

293 No other methods should be called after this one. 

294 """ 

295 raise NotImplementedError 

296 

297 async def shutdown_asyncgens(self): 

298 """Shutdown all active asynchronous generators.""" 

299 raise NotImplementedError 

300 

301 async def shutdown_default_executor(self): 

302 """Schedule the shutdown of the default executor.""" 

303 raise NotImplementedError 

304 

305 # Methods scheduling callbacks. All these return Handles. 

306 

307 def _timer_handle_cancelled(self, handle): 

308 """Notification that a TimerHandle has been cancelled.""" 

309 raise NotImplementedError 

310 

311 def call_soon(self, callback, *args, context=None): 

312 return self.call_later(0, callback, *args, context=context) 

313 

314 def call_later(self, delay, callback, *args, context=None): 

315 raise NotImplementedError 

316 

317 def call_at(self, when, callback, *args, context=None): 

318 raise NotImplementedError 

319 

320 def time(self): 

321 raise NotImplementedError 

322 

323 def create_future(self): 

324 raise NotImplementedError 

325 

326 # Method scheduling a coroutine object: create a task. 

327 

328 def create_task(self, coro, **kwargs): 

329 raise NotImplementedError 

330 

331 # Methods for interacting with threads. 

332 

333 def call_soon_threadsafe(self, callback, *args, context=None): 

334 raise NotImplementedError 

335 

336 def run_in_executor(self, executor, func, *args): 

337 raise NotImplementedError 

338 

339 def set_default_executor(self, executor): 

340 raise NotImplementedError 

341 

342 # Network I/O methods returning Futures. 

343 

344 async def getaddrinfo(self, host, port, *, 

345 family=0, type=0, proto=0, flags=0): 

346 raise NotImplementedError 

347 

348 async def getnameinfo(self, sockaddr, flags=0): 

349 raise NotImplementedError 

350 

351 async def create_connection( 

352 self, protocol_factory, host=None, port=None, 

353 *, ssl=None, family=0, proto=0, 

354 flags=0, sock=None, local_addr=None, 

355 server_hostname=None, 

356 ssl_handshake_timeout=None, 

357 ssl_shutdown_timeout=None, 

358 happy_eyeballs_delay=None, interleave=None): 

359 raise NotImplementedError 

360 

361 async def create_server( 

362 self, protocol_factory, host=None, port=None, 

363 *, family=socket.AF_UNSPEC, 

364 flags=socket.AI_PASSIVE, sock=None, backlog=100, 

365 ssl=None, reuse_address=None, reuse_port=None, 

366 keep_alive=None, 

367 ssl_handshake_timeout=None, 

368 ssl_shutdown_timeout=None, 

369 start_serving=True): 

370 """A coroutine which creates a TCP server bound to host and port. 

371 

372 The return value is a Server object which can be used to stop 

373 the service. 

374 

375 If host is an empty string or None all interfaces are assumed 

376 and a list of multiple sockets will be returned (most likely 

377 one for IPv4 and another one for IPv6). The host parameter can also be 

378 a sequence (e.g. list) of hosts to bind to. 

379 

380 family can be set to either AF_INET or AF_INET6 to force the 

381 socket to use IPv4 or IPv6. If not set it will be determined 

382 from host (defaults to AF_UNSPEC). 

383 

384 flags is a bitmask for getaddrinfo(). 

385 

386 sock can optionally be specified in order to use a preexisting 

387 socket object. 

388 

389 backlog is the maximum number of queued connections passed to 

390 listen() (defaults to 100). 

391 

392 ssl can be set to an SSLContext to enable SSL over the 

393 accepted connections. 

394 

395 reuse_address tells the kernel to reuse a local socket in 

396 TIME_WAIT state, without waiting for its natural timeout to 

397 expire. If not specified will automatically be set to True on 

398 UNIX. 

399 

400 reuse_port tells the kernel to allow this endpoint to be bound to 

401 the same port as other existing endpoints are bound to, so long as 

402 they all set this flag when being created. This option is not 

403 supported on Windows. 

404 

405 keep_alive set to True keeps connections active by enabling the 

406 periodic transmission of messages. 

407 

408 ssl_handshake_timeout is the time in seconds that an SSL server 

409 will wait for completion of the SSL handshake before aborting the 

410 connection. Default is 60s. 

411 

412 ssl_shutdown_timeout is the time in seconds that an SSL server 

413 will wait for completion of the SSL shutdown procedure 

414 before aborting the connection. Default is 30s. 

415 

416 start_serving set to True (default) causes the created server 

417 to start accepting connections immediately. When set to False, 

418 the user should await Server.start_serving() or Server.serve_forever() 

419 to make the server to start accepting connections. 

420 """ 

421 raise NotImplementedError 

422 

423 async def sendfile(self, transport, file, offset=0, count=None, 

424 *, fallback=True): 

425 """Send a file through a transport. 

426 

427 Return an amount of sent bytes. 

428 """ 

429 raise NotImplementedError 

430 

431 async def start_tls(self, transport, protocol, sslcontext, *, 

432 server_side=False, 

433 server_hostname=None, 

434 ssl_handshake_timeout=None, 

435 ssl_shutdown_timeout=None): 

436 """Upgrade a transport to TLS. 

437 

438 Return a new transport that *protocol* should start using 

439 immediately. 

440 """ 

441 raise NotImplementedError 

442 

443 async def create_unix_connection( 

444 self, protocol_factory, path=None, *, 

445 ssl=None, sock=None, 

446 server_hostname=None, 

447 ssl_handshake_timeout=None, 

448 ssl_shutdown_timeout=None): 

449 raise NotImplementedError 

450 

451 async def create_unix_server( 

452 self, protocol_factory, path=None, *, 

453 sock=None, backlog=100, ssl=None, 

454 ssl_handshake_timeout=None, 

455 ssl_shutdown_timeout=None, 

456 start_serving=True): 

457 """A coroutine which creates a UNIX Domain Socket server. 

458 

459 The return value is a Server object, which can be used to stop 

460 the service. 

461 

462 path is a str, representing a file system path to bind the 

463 server socket to. 

464 

465 sock can optionally be specified in order to use a preexisting 

466 socket object. 

467 

468 backlog is the maximum number of queued connections passed to 

469 listen() (defaults to 100). 

470 

471 ssl can be set to an SSLContext to enable SSL over the 

472 accepted connections. 

473 

474 ssl_handshake_timeout is the time in seconds that an SSL server 

475 will wait for the SSL handshake to complete (defaults to 60s). 

476 

477 ssl_shutdown_timeout is the time in seconds that an SSL server 

478 will wait for the SSL shutdown to finish (defaults to 30s). 

479 

480 start_serving set to True (default) causes the created server 

481 to start accepting connections immediately. When set to False, 

482 the user should await Server.start_serving() or Server.serve_forever() 

483 to make the server to start accepting connections. 

484 """ 

485 raise NotImplementedError 

486 

487 async def connect_accepted_socket( 

488 self, protocol_factory, sock, 

489 *, ssl=None, 

490 ssl_handshake_timeout=None, 

491 ssl_shutdown_timeout=None): 

492 """Handle an accepted connection. 

493 

494 This is used by servers that accept connections outside of 

495 asyncio, but use asyncio to handle connections. 

496 

497 This method is a coroutine. When completed, the coroutine 

498 returns a (transport, protocol) pair. 

499 """ 

500 raise NotImplementedError 

501 

502 async def create_datagram_endpoint(self, protocol_factory, 

503 local_addr=None, remote_addr=None, *, 

504 family=0, proto=0, flags=0, 

505 reuse_address=None, reuse_port=None, 

506 allow_broadcast=None, sock=None): 

507 """A coroutine which creates a datagram endpoint. 

508 

509 This method will try to establish the endpoint in the background. 

510 When successful, the coroutine returns a (transport, protocol) pair. 

511 

512 protocol_factory must be a callable returning a protocol instance. 

513 

514 socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on 

515 host (or family if specified), socket type SOCK_DGRAM. 

516 

517 reuse_address tells the kernel to reuse a local socket in 

518 TIME_WAIT state, without waiting for its natural timeout to 

519 expire. If not specified it will automatically be set to True on 

520 UNIX. 

521 

522 reuse_port tells the kernel to allow this endpoint to be bound to 

523 the same port as other existing endpoints are bound to, so long as 

524 they all set this flag when being created. This option is not 

525 supported on Windows and some UNIX's. If the 

526 :py:data:`~socket.SO_REUSEPORT` constant is not defined then this 

527 capability is unsupported. 

528 

529 allow_broadcast tells the kernel to allow this endpoint to send 

530 messages to the broadcast address. 

531 

532 sock can optionally be specified in order to use a preexisting 

533 socket object. 

534 """ 

535 raise NotImplementedError 

536 

537 # Pipes and subprocesses. 

538 

539 async def connect_read_pipe(self, protocol_factory, pipe): 

540 """Register read pipe in event loop. Set the pipe to non-blocking mode. 

541 

542 protocol_factory should instantiate object with Protocol interface. 

543 pipe is a file-like object. 

544 Return pair (transport, protocol), where transport supports the 

545 ReadTransport interface.""" 

546 # The reason to accept file-like object instead of just file descriptor 

547 # is: we need to own pipe and close it at transport finishing 

548 # Can got complicated errors if pass f.fileno(), 

549 # close fd in pipe transport then close f and vice versa. 

550 raise NotImplementedError 

551 

552 async def connect_write_pipe(self, protocol_factory, pipe): 

553 """Register write pipe in event loop. 

554 

555 protocol_factory should instantiate object with BaseProtocol interface. 

556 Pipe is file-like object already switched to nonblocking. 

557 Return pair (transport, protocol), where transport support 

558 WriteTransport interface.""" 

559 # The reason to accept file-like object instead of just file descriptor 

560 # is: we need to own pipe and close it at transport finishing 

561 # Can got complicated errors if pass f.fileno(), 

562 # close fd in pipe transport then close f and vice versa. 

563 raise NotImplementedError 

564 

565 async def subprocess_shell(self, protocol_factory, cmd, *, 

566 stdin=subprocess.PIPE, 

567 stdout=subprocess.PIPE, 

568 stderr=subprocess.PIPE, 

569 **kwargs): 

570 raise NotImplementedError 

571 

572 async def subprocess_exec(self, protocol_factory, *args, 

573 stdin=subprocess.PIPE, 

574 stdout=subprocess.PIPE, 

575 stderr=subprocess.PIPE, 

576 **kwargs): 

577 raise NotImplementedError 

578 

579 # Ready-based callback registration methods. 

580 # The add_*() methods return None. 

581 # The remove_*() methods return True if something was removed, 

582 # False if there was nothing to delete. 

583 

584 def add_reader(self, fd, callback, *args): 

585 raise NotImplementedError 

586 

587 def remove_reader(self, fd): 

588 raise NotImplementedError 

589 

590 def add_writer(self, fd, callback, *args): 

591 raise NotImplementedError 

592 

593 def remove_writer(self, fd): 

594 raise NotImplementedError 

595 

596 # Completion based I/O methods returning Futures. 

597 

598 async def sock_recv(self, sock, nbytes): 

599 raise NotImplementedError 

600 

601 async def sock_recv_into(self, sock, buf): 

602 raise NotImplementedError 

603 

604 async def sock_recvfrom(self, sock, bufsize): 

605 raise NotImplementedError 

606 

607 async def sock_recvfrom_into(self, sock, buf, nbytes=0): 

608 raise NotImplementedError 

609 

610 async def sock_sendall(self, sock, data): 

611 raise NotImplementedError 

612 

613 async def sock_sendto(self, sock, data, address): 

614 raise NotImplementedError 

615 

616 async def sock_connect(self, sock, address): 

617 raise NotImplementedError 

618 

619 async def sock_accept(self, sock): 

620 raise NotImplementedError 

621 

622 async def sock_sendfile(self, sock, file, offset=0, count=None, 

623 *, fallback=None): 

624 raise NotImplementedError 

625 

626 # Signal handling. 

627 

628 def add_signal_handler(self, sig, callback, *args): 

629 raise NotImplementedError 

630 

631 def remove_signal_handler(self, sig): 

632 raise NotImplementedError 

633 

634 # Task factory. 

635 

636 def set_task_factory(self, factory): 

637 raise NotImplementedError 

638 

639 def get_task_factory(self): 

640 raise NotImplementedError 

641 

642 # Error handlers. 

643 

644 def get_exception_handler(self): 

645 raise NotImplementedError 

646 

647 def set_exception_handler(self, handler): 

648 raise NotImplementedError 

649 

650 def default_exception_handler(self, context): 

651 raise NotImplementedError 

652 

653 def call_exception_handler(self, context): 

654 raise NotImplementedError 

655 

656 # Debug flag management. 

657 

658 def get_debug(self): 

659 raise NotImplementedError 

660 

661 def set_debug(self, enabled): 

662 raise NotImplementedError 

663 

664 

665class _AbstractEventLoopPolicy: 

666 """Abstract policy for accessing the event loop.""" 

667 

668 def get_event_loop(self): 

669 """Get the event loop for the current context. 

670 

671 Returns an event loop object implementing the AbstractEventLoop interface, 

672 or raises an exception in case no event loop has been set for the 

673 current context and the current policy does not specify to create one. 

674 

675 It should never return None.""" 

676 raise NotImplementedError 

677 

678 def set_event_loop(self, loop): 

679 """Set the event loop for the current context to loop.""" 

680 raise NotImplementedError 

681 

682 def new_event_loop(self): 

683 """Create and return a new event loop object according to this 

684 policy's rules. If there's need to set this loop as the event loop for 

685 the current context, set_event_loop must be called explicitly.""" 

686 raise NotImplementedError 

687 

688class _BaseDefaultEventLoopPolicy(_AbstractEventLoopPolicy): 

689 """Default policy implementation for accessing the event loop. 

690 

691 In this policy, each thread has its own event loop. However, we 

692 only automatically create an event loop by default for the main 

693 thread; other threads by default have no event loop. 

694 

695 Other policies may have different rules (e.g. a single global 

696 event loop, or automatically creating an event loop per thread, or 

697 using some other notion of context to which an event loop is 

698 associated). 

699 """ 

700 

701 _loop_factory = None 

702 

703 class _Local(threading.local): 

704 _loop = None 

705 

706 def __init__(self): 

707 self._local = self._Local() 

708 

709 def get_event_loop(self): 

710 """Get the event loop for the current context. 

711 

712 Returns an instance of EventLoop or raises an exception. 

713 """ 

714 if self._local._loop is None: 

715 raise RuntimeError('There is no current event loop in thread %r.' 

716 % threading.current_thread().name) 

717 

718 return self._local._loop 

719 

720 def set_event_loop(self, loop): 

721 """Set the event loop.""" 

722 if loop is not None and not isinstance(loop, AbstractEventLoop): 

723 raise TypeError(f"loop must be an instance of AbstractEventLoop or None, not '{type(loop).__name__}'") 

724 self._local._loop = loop 

725 

726 def new_event_loop(self): 

727 """Create a new event loop. 

728 

729 You must call set_event_loop() to make this the current event 

730 loop. 

731 """ 

732 return self._loop_factory() 

733 

734 

735# Event loop policy. The policy itself is always global, even if the 

736# policy's rules say that there is an event loop per thread (or other 

737# notion of context). The default policy is installed by the first 

738# call to get_event_loop_policy(). 

739_event_loop_policy = None 

740 

741# Lock for protecting the on-the-fly creation of the event loop policy. 

742_lock = threading.Lock() 

743 

744 

745# A TLS for the running event loop, used by _get_running_loop. 

746class _RunningLoop(threading.local): 

747 loop_pid = (None, None) 

748 

749 

750_running_loop = _RunningLoop() 

751 

752 

753def get_running_loop(): 

754 """Return the running event loop. Raise a RuntimeError if there is none. 

755 

756 This function is thread-specific. 

757 """ 

758 # NOTE: this function is implemented in C (see _asynciomodule.c) 

759 loop = _get_running_loop() 

760 if loop is None: 

761 raise RuntimeError('no running event loop') 

762 return loop 

763 

764 

765def _get_running_loop(): 

766 """Return the running event loop or None. 

767 

768 This is a low-level function intended to be used by event loops. 

769 This function is thread-specific. 

770 """ 

771 # NOTE: this function is implemented in C (see _asynciomodule.c) 

772 running_loop, pid = _running_loop.loop_pid 

773 if running_loop is not None and pid == os.getpid(): 

774 return running_loop 

775 

776 

777def _set_running_loop(loop): 

778 """Set the running event loop. 

779 

780 This is a low-level function intended to be used by event loops. 

781 This function is thread-specific. 

782 """ 

783 # NOTE: this function is implemented in C (see _asynciomodule.c) 

784 _running_loop.loop_pid = (loop, os.getpid()) 

785 

786 

787def _init_event_loop_policy(): 

788 global _event_loop_policy 

789 with _lock: 

790 if _event_loop_policy is None: # pragma: no branch 

791 if sys.platform == 'win32': 

792 from .windows_events import _DefaultEventLoopPolicy 

793 else: 

794 from .unix_events import _DefaultEventLoopPolicy 

795 _event_loop_policy = _DefaultEventLoopPolicy() 

796 

797 

798def _get_event_loop_policy(): 

799 """Get the current event loop policy.""" 

800 if _event_loop_policy is None: 

801 _init_event_loop_policy() 

802 return _event_loop_policy 

803 

804def get_event_loop_policy(): 

805 warnings._deprecated('asyncio.get_event_loop_policy', remove=(3, 16)) 

806 return _get_event_loop_policy() 

807 

808def _set_event_loop_policy(policy): 

809 """Set the current event loop policy. 

810 

811 If policy is None, the default policy is restored.""" 

812 global _event_loop_policy 

813 if policy is not None and not isinstance(policy, _AbstractEventLoopPolicy): 

814 raise TypeError(f"policy must be an instance of AbstractEventLoopPolicy or None, not '{type(policy).__name__}'") 

815 _event_loop_policy = policy 

816 

817def set_event_loop_policy(policy): 

818 warnings._deprecated('asyncio.set_event_loop_policy', remove=(3,16)) 

819 _set_event_loop_policy(policy) 

820 

821def get_event_loop(): 

822 """Return an asyncio event loop. 

823 

824 When called from a coroutine or a callback (e.g. scheduled with call_soon 

825 or similar API), this function will always return the running event loop. 

826 

827 If there is no running event loop set, the function will return 

828 the result of `get_event_loop_policy().get_event_loop()` call. 

829 """ 

830 # NOTE: this function is implemented in C (see _asynciomodule.c) 

831 current_loop = _get_running_loop() 

832 if current_loop is not None: 

833 return current_loop 

834 return _get_event_loop_policy().get_event_loop() 

835 

836 

837def set_event_loop(loop): 

838 """Equivalent to calling get_event_loop_policy().set_event_loop(loop).""" 

839 _get_event_loop_policy().set_event_loop(loop) 

840 

841 

842def new_event_loop(): 

843 """Equivalent to calling get_event_loop_policy().new_event_loop().""" 

844 return _get_event_loop_policy().new_event_loop() 

845 

846 

847# Alias pure-Python implementations for testing purposes. 

848_py__get_running_loop = _get_running_loop 

849_py__set_running_loop = _set_running_loop 

850_py_get_running_loop = get_running_loop 

851_py_get_event_loop = get_event_loop 

852 

853 

854try: 

855 # get_event_loop() is one of the most frequently called 

856 # functions in asyncio. Pure Python implementation is 

857 # about 4 times slower than C-accelerated. 

858 from _asyncio import (_get_running_loop, _set_running_loop, 

859 get_running_loop, get_event_loop) 

860except ImportError: 

861 pass 

862else: 

863 # Alias C implementations for testing purposes. 

864 _c__get_running_loop = _get_running_loop 

865 _c__set_running_loop = _set_running_loop 

866 _c_get_running_loop = get_running_loop 

867 _c_get_event_loop = get_event_loop 

868 

869 

870if hasattr(os, 'fork'): 870 ↛ exitline 870 didn't exit the module because the condition on line 870 was always true

871 def on_fork(): 

872 # Reset the loop and wakeupfd in the forked child process. 

873 if _event_loop_policy is not None: 

874 _event_loop_policy._local = _BaseDefaultEventLoopPolicy._Local() 

875 _set_running_loop(None) 

876 signal.set_wakeup_fd(-1) 

877 

878 os.register_at_fork(after_in_child=on_fork)