Coverage for Lib/asyncio/windows_events.py: 0%

549 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-15 02:02 +0000

1"""Selector and proactor event loops for Windows.""" 

2 

3import sys 

4 

5if sys.platform != 'win32': # pragma: no cover 

6 raise ImportError('win32 only') 

7 

8import _overlapped 

9import _winapi 

10import errno 

11from functools import partial 

12import math 

13import msvcrt 

14import socket 

15import struct 

16import time 

17import weakref 

18 

19from . import events 

20from . import base_subprocess 

21from . import futures 

22from . import exceptions 

23from . import proactor_events 

24from . import selector_events 

25from . import tasks 

26from . import windows_utils 

27from .log import logger 

28 

29 

30__all__ = ( 

31 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor', 

32 '_DefaultEventLoopPolicy', '_WindowsSelectorEventLoopPolicy', 

33 '_WindowsProactorEventLoopPolicy', 'EventLoop', 

34) 

35 

36 

37NULL = _winapi.NULL 

38INFINITE = _winapi.INFINITE 

39ERROR_CONNECTION_REFUSED = 1225 

40ERROR_CONNECTION_ABORTED = 1236 

41 

42# Initial delay in seconds for connect_pipe() before retrying to connect 

43CONNECT_PIPE_INIT_DELAY = 0.001 

44 

45# Maximum delay in seconds for connect_pipe() before retrying to connect 

46CONNECT_PIPE_MAX_DELAY = 0.100 

47 

48 

49class _OverlappedFuture(futures.Future): 

50 """Subclass of Future which represents an overlapped operation. 

51 

52 Cancelling it will immediately cancel the overlapped operation. 

53 """ 

54 

55 def __init__(self, ov, *, loop=None): 

56 super().__init__(loop=loop) 

57 if self._source_traceback: 

58 del self._source_traceback[-1] 

59 self._ov = ov 

60 

61 def _repr_info(self): 

62 info = super()._repr_info() 

63 if self._ov is not None: 

64 state = 'pending' if self._ov.pending else 'completed' 

65 info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>') 

66 return info 

67 

68 def _cancel_overlapped(self): 

69 if self._ov is None: 

70 return 

71 try: 

72 self._ov.cancel() 

73 except OSError as exc: 

74 context = { 

75 'message': 'Cancelling an overlapped future failed', 

76 'exception': exc, 

77 'future': self, 

78 } 

79 if self._source_traceback: 

80 context['source_traceback'] = self._source_traceback 

81 self._loop.call_exception_handler(context) 

82 self._ov = None 

83 

84 def cancel(self, msg=None): 

85 self._cancel_overlapped() 

86 return super().cancel(msg=msg) 

87 

88 def set_exception(self, exception): 

89 super().set_exception(exception) 

90 self._cancel_overlapped() 

91 

92 def set_result(self, result): 

93 super().set_result(result) 

94 self._ov = None 

95 

96 

97class _BaseWaitHandleFuture(futures.Future): 

98 """Subclass of Future which represents a wait handle.""" 

99 

100 def __init__(self, ov, handle, wait_handle, *, loop=None): 

101 super().__init__(loop=loop) 

102 if self._source_traceback: 

103 del self._source_traceback[-1] 

104 # Keep a reference to the Overlapped object to keep it alive until the 

105 # wait is unregistered 

106 self._ov = ov 

107 self._handle = handle 

108 self._wait_handle = wait_handle 

109 

110 # Should we call UnregisterWaitEx() if the wait completes 

111 # or is cancelled? 

112 self._registered = True 

113 

114 def _poll(self): 

115 # non-blocking wait: use a timeout of 0 millisecond 

116 return (_winapi.WaitForSingleObject(self._handle, 0) == 

117 _winapi.WAIT_OBJECT_0) 

118 

119 def _repr_info(self): 

120 info = super()._repr_info() 

121 info.append(f'handle={self._handle:#x}') 

122 if self._handle is not None: 

123 state = 'signaled' if self._poll() else 'waiting' 

124 info.append(state) 

125 if self._wait_handle is not None: 

126 info.append(f'wait_handle={self._wait_handle:#x}') 

127 return info 

128 

129 def _unregister_wait_cb(self, fut): 

130 # The wait was unregistered: it's not safe to destroy the Overlapped 

131 # object 

132 self._ov = None 

133 

134 def _unregister_wait(self): 

135 if not self._registered: 

136 return 

137 self._registered = False 

138 

139 wait_handle = self._wait_handle 

140 self._wait_handle = None 

141 try: 

142 _overlapped.UnregisterWait(wait_handle) 

143 except OSError as exc: 

144 if exc.winerror != _overlapped.ERROR_IO_PENDING: 

145 context = { 

146 'message': 'Failed to unregister the wait handle', 

147 'exception': exc, 

148 'future': self, 

149 } 

150 if self._source_traceback: 

151 context['source_traceback'] = self._source_traceback 

152 self._loop.call_exception_handler(context) 

153 return 

154 # ERROR_IO_PENDING means that the unregister is pending 

155 

156 self._unregister_wait_cb(None) 

157 

158 def cancel(self, msg=None): 

159 self._unregister_wait() 

160 return super().cancel(msg=msg) 

161 

162 def set_exception(self, exception): 

163 self._unregister_wait() 

164 super().set_exception(exception) 

165 

166 def set_result(self, result): 

167 self._unregister_wait() 

168 super().set_result(result) 

169 

170 

171class _WaitCancelFuture(_BaseWaitHandleFuture): 

172 """Subclass of Future which represents a wait for the cancellation of a 

173 _WaitHandleFuture using an event. 

174 """ 

175 

176 def __init__(self, ov, event, wait_handle, *, loop=None): 

177 super().__init__(ov, event, wait_handle, loop=loop) 

178 

179 self._done_callback = None 

180 

181 def cancel(self): 

182 raise RuntimeError("_WaitCancelFuture must not be cancelled") 

183 

184 def set_result(self, result): 

185 super().set_result(result) 

186 if self._done_callback is not None: 

187 self._done_callback(self) 

188 

189 def set_exception(self, exception): 

190 super().set_exception(exception) 

191 if self._done_callback is not None: 

192 self._done_callback(self) 

193 

194 

195class _WaitHandleFuture(_BaseWaitHandleFuture): 

196 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None): 

197 super().__init__(ov, handle, wait_handle, loop=loop) 

198 self._proactor = proactor 

199 self._unregister_proactor = True 

200 self._event = _overlapped.CreateEvent(None, True, False, None) 

201 self._event_fut = None 

202 

203 def _unregister_wait_cb(self, fut): 

204 if self._event is not None: 

205 _winapi.CloseHandle(self._event) 

206 self._event = None 

207 self._event_fut = None 

208 

209 # If the wait was cancelled, the wait may never be signalled, so 

210 # it's required to unregister it. Otherwise, IocpProactor.close() will 

211 # wait forever for an event which will never come. 

212 # 

213 # If the IocpProactor already received the event, it's safe to call 

214 # _unregister() because we kept a reference to the Overlapped object 

215 # which is used as a unique key. 

216 self._proactor._unregister(self._ov) 

217 self._proactor = None 

218 

219 super()._unregister_wait_cb(fut) 

220 

221 def _unregister_wait(self): 

222 if not self._registered: 

223 return 

224 self._registered = False 

225 

226 wait_handle = self._wait_handle 

227 self._wait_handle = None 

228 try: 

229 _overlapped.UnregisterWaitEx(wait_handle, self._event) 

230 except OSError as exc: 

231 if exc.winerror != _overlapped.ERROR_IO_PENDING: 

232 context = { 

233 'message': 'Failed to unregister the wait handle', 

234 'exception': exc, 

235 'future': self, 

236 } 

237 if self._source_traceback: 

238 context['source_traceback'] = self._source_traceback 

239 self._loop.call_exception_handler(context) 

240 return 

241 # ERROR_IO_PENDING is not an error, the wait was unregistered 

242 

243 self._event_fut = self._proactor._wait_cancel(self._event, 

244 self._unregister_wait_cb) 

245 

246 

247class PipeServer(object): 

248 """Class representing a pipe server. 

249 

250 This is much like a bound, listening socket. 

251 """ 

252 def __init__(self, address): 

253 self._address = address 

254 self._free_instances = weakref.WeakSet() 

255 # initialize the pipe attribute before calling _server_pipe_handle() 

256 # because this function can raise an exception and the destructor calls 

257 # the close() method 

258 self._pipe = None 

259 self._accept_pipe_future = None 

260 self._pipe = self._server_pipe_handle(True) 

261 

262 def _get_unconnected_pipe(self): 

263 # Create new instance and return previous one. This ensures 

264 # that (until the server is closed) there is always at least 

265 # one pipe handle for address. Therefore if a client attempt 

266 # to connect it will not fail with FileNotFoundError. 

267 tmp, self._pipe = self._pipe, self._server_pipe_handle(False) 

268 return tmp 

269 

270 def _server_pipe_handle(self, first): 

271 # Return a wrapper for a new pipe handle. 

272 if self.closed(): 

273 return None 

274 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 

275 if first: 

276 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 

277 h = _winapi.CreateNamedPipe( 

278 self._address, flags, 

279 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 

280 _winapi.PIPE_WAIT, 

281 _winapi.PIPE_UNLIMITED_INSTANCES, 

282 windows_utils.BUFSIZE, windows_utils.BUFSIZE, 

283 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL) 

284 pipe = windows_utils.PipeHandle(h) 

285 self._free_instances.add(pipe) 

286 return pipe 

287 

288 def closed(self): 

289 return (self._address is None) 

290 

291 def close(self): 

292 if self._accept_pipe_future is not None: 

293 self._accept_pipe_future.cancel() 

294 self._accept_pipe_future = None 

295 # Close all instances which have not been connected to by a client. 

296 if self._address is not None: 

297 for pipe in self._free_instances: 

298 pipe.close() 

299 self._pipe = None 

300 self._address = None 

301 self._free_instances.clear() 

302 

303 __del__ = close 

304 

305 

306class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop): 

307 """Windows version of selector event loop.""" 

308 

309 

310class ProactorEventLoop(proactor_events.BaseProactorEventLoop): 

311 """Windows version of proactor event loop using IOCP.""" 

312 

313 def __init__(self, proactor=None): 

314 if proactor is None: 

315 proactor = IocpProactor() 

316 super().__init__(proactor) 

317 

318 def _run_forever_setup(self): 

319 assert self._self_reading_future is None 

320 self.call_soon(self._loop_self_reading) 

321 super()._run_forever_setup() 

322 

323 def _run_forever_cleanup(self): 

324 super()._run_forever_cleanup() 

325 if self._self_reading_future is not None: 

326 ov = self._self_reading_future._ov 

327 self._self_reading_future.cancel() 

328 # self_reading_future always uses IOCP, so even though it's 

329 # been cancelled, we need to make sure that the IOCP message 

330 # is received so that the kernel is not holding on to the 

331 # memory, possibly causing memory corruption later. Only 

332 # unregister it if IO is complete in all respects. Otherwise 

333 # we need another _poll() later to complete the IO. 

334 if ov is not None and not ov.pending: 

335 self._proactor._unregister(ov) 

336 self._self_reading_future = None 

337 

338 async def create_pipe_connection(self, protocol_factory, address): 

339 f = self._proactor.connect_pipe(address) 

340 pipe = await f 

341 protocol = protocol_factory() 

342 trans = self._make_duplex_pipe_transport(pipe, protocol, 

343 extra={'addr': address}) 

344 return trans, protocol 

345 

346 async def start_serving_pipe(self, protocol_factory, address): 

347 server = PipeServer(address) 

348 

349 def loop_accept_pipe(f=None): 

350 pipe = None 

351 try: 

352 if f: 

353 pipe = f.result() 

354 server._free_instances.discard(pipe) 

355 

356 if server.closed(): 

357 # A client connected before the server was closed: 

358 # drop the client (close the pipe) and exit 

359 pipe.close() 

360 return 

361 

362 protocol = protocol_factory() 

363 self._make_duplex_pipe_transport( 

364 pipe, protocol, extra={'addr': address}) 

365 

366 pipe = server._get_unconnected_pipe() 

367 if pipe is None: 

368 return 

369 

370 f = self._proactor.accept_pipe(pipe) 

371 except BrokenPipeError: 

372 if pipe and pipe.fileno() != -1: 

373 pipe.close() 

374 self.call_soon(loop_accept_pipe) 

375 except OSError as exc: 

376 if pipe and pipe.fileno() != -1: 

377 self.call_exception_handler({ 

378 'message': 'Pipe accept failed', 

379 'exception': exc, 

380 'pipe': pipe, 

381 }) 

382 pipe.close() 

383 elif self._debug: 

384 logger.warning("Accept pipe failed on pipe %r", 

385 pipe, exc_info=True) 

386 self.call_soon(loop_accept_pipe) 

387 except exceptions.CancelledError: 

388 if pipe: 

389 pipe.close() 

390 else: 

391 server._accept_pipe_future = f 

392 f.add_done_callback(loop_accept_pipe) 

393 

394 self.call_soon(loop_accept_pipe) 

395 return [server] 

396 

397 async def _make_subprocess_transport(self, protocol, args, shell, 

398 stdin, stdout, stderr, bufsize, 

399 extra=None, **kwargs): 

400 waiter = self.create_future() 

401 transp = _WindowsSubprocessTransport(self, protocol, args, shell, 

402 stdin, stdout, stderr, bufsize, 

403 waiter=waiter, extra=extra, 

404 **kwargs) 

405 try: 

406 await waiter 

407 except (SystemExit, KeyboardInterrupt): 

408 raise 

409 except BaseException: 

410 transp.close() 

411 await transp._wait() 

412 raise 

413 

414 return transp 

415 

416 

417class IocpProactor: 

418 """Proactor implementation using IOCP.""" 

419 

420 def __init__(self, concurrency=INFINITE): 

421 self._loop = None 

422 self._results = [] 

423 self._iocp = _overlapped.CreateIoCompletionPort( 

424 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency) 

425 self._cache = {} 

426 self._registered = weakref.WeakSet() 

427 self._unregistered = [] 

428 self._stopped_serving = weakref.WeakSet() 

429 

430 def _check_closed(self): 

431 if self._iocp is None: 

432 raise RuntimeError('IocpProactor is closed') 

433 

434 def __repr__(self): 

435 info = ['overlapped#=%s' % len(self._cache), 

436 'result#=%s' % len(self._results)] 

437 if self._iocp is None: 

438 info.append('closed') 

439 return '<%s %s>' % (self.__class__.__name__, " ".join(info)) 

440 

441 def set_loop(self, loop): 

442 self._loop = loop 

443 

444 def select(self, timeout=None): 

445 if not self._results: 

446 self._poll(timeout) 

447 tmp = self._results 

448 self._results = [] 

449 try: 

450 return tmp 

451 finally: 

452 # Needed to break cycles when an exception occurs. 

453 tmp = None 

454 

455 def _result(self, value): 

456 fut = self._loop.create_future() 

457 fut.set_result(value) 

458 return fut 

459 

460 @staticmethod 

461 def finish_socket_func(trans, key, ov): 

462 try: 

463 return ov.getresult() 

464 except OSError as exc: 

465 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 

466 _overlapped.ERROR_OPERATION_ABORTED): 

467 raise ConnectionResetError(*exc.args) 

468 else: 

469 raise 

470 

471 @classmethod 

472 def _finish_recvfrom(cls, trans, key, ov, *, empty_result): 

473 try: 

474 return cls.finish_socket_func(trans, key, ov) 

475 except OSError as exc: 

476 # WSARecvFrom will report ERROR_PORT_UNREACHABLE when the same 

477 # socket is used to send to an address that is not listening. 

478 if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE: 

479 return empty_result, None 

480 else: 

481 raise 

482 

483 def recv(self, conn, nbytes, flags=0): 

484 self._register_with_iocp(conn) 

485 ov = _overlapped.Overlapped(NULL) 

486 try: 

487 if isinstance(conn, socket.socket): 

488 ov.WSARecv(conn.fileno(), nbytes, flags) 

489 else: 

490 ov.ReadFile(conn.fileno(), nbytes) 

491 except BrokenPipeError: 

492 return self._result(b'') 

493 

494 return self._register(ov, conn, self.finish_socket_func) 

495 

496 def recv_into(self, conn, buf, flags=0): 

497 self._register_with_iocp(conn) 

498 ov = _overlapped.Overlapped(NULL) 

499 try: 

500 if isinstance(conn, socket.socket): 

501 ov.WSARecvInto(conn.fileno(), buf, flags) 

502 else: 

503 ov.ReadFileInto(conn.fileno(), buf) 

504 except BrokenPipeError: 

505 return self._result(0) 

506 

507 return self._register(ov, conn, self.finish_socket_func) 

508 

509 def recvfrom(self, conn, nbytes, flags=0): 

510 self._register_with_iocp(conn) 

511 ov = _overlapped.Overlapped(NULL) 

512 try: 

513 ov.WSARecvFrom(conn.fileno(), nbytes, flags) 

514 except BrokenPipeError: 

515 return self._result((b'', None)) 

516 

517 return self._register(ov, conn, partial(self._finish_recvfrom, 

518 empty_result=b'')) 

519 

520 def recvfrom_into(self, conn, buf, flags=0): 

521 self._register_with_iocp(conn) 

522 ov = _overlapped.Overlapped(NULL) 

523 try: 

524 ov.WSARecvFromInto(conn.fileno(), buf, flags) 

525 except BrokenPipeError: 

526 return self._result((0, None)) 

527 

528 return self._register(ov, conn, partial(self._finish_recvfrom, 

529 empty_result=0)) 

530 

531 def sendto(self, conn, buf, flags=0, addr=None): 

532 self._register_with_iocp(conn) 

533 ov = _overlapped.Overlapped(NULL) 

534 

535 ov.WSASendTo(conn.fileno(), buf, flags, addr) 

536 

537 return self._register(ov, conn, self.finish_socket_func) 

538 

539 def send(self, conn, buf, flags=0): 

540 self._register_with_iocp(conn) 

541 ov = _overlapped.Overlapped(NULL) 

542 if isinstance(conn, socket.socket): 

543 ov.WSASend(conn.fileno(), buf, flags) 

544 else: 

545 ov.WriteFile(conn.fileno(), buf) 

546 

547 return self._register(ov, conn, self.finish_socket_func) 

548 

549 def accept(self, listener): 

550 self._register_with_iocp(listener) 

551 conn = self._get_accept_socket(listener.family) 

552 ov = _overlapped.Overlapped(NULL) 

553 ov.AcceptEx(listener.fileno(), conn.fileno()) 

554 

555 def finish_accept(trans, key, ov): 

556 ov.getresult() 

557 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work. 

558 buf = struct.pack('@P', listener.fileno()) 

559 conn.setsockopt(socket.SOL_SOCKET, 

560 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf) 

561 conn.settimeout(listener.gettimeout()) 

562 return conn, conn.getpeername() 

563 

564 async def accept_coro(future, conn): 

565 # Coroutine closing the accept socket if the future is cancelled 

566 try: 

567 await future 

568 except exceptions.CancelledError: 

569 conn.close() 

570 raise 

571 

572 future = self._register(ov, listener, finish_accept) 

573 coro = accept_coro(future, conn) 

574 tasks.ensure_future(coro, loop=self._loop) 

575 return future 

576 

577 def connect(self, conn, address): 

578 if conn.type == socket.SOCK_DGRAM: 

579 # WSAConnect will complete immediately for UDP sockets so we don't 

580 # need to register any IOCP operation 

581 _overlapped.WSAConnect(conn.fileno(), address) 

582 fut = self._loop.create_future() 

583 fut.set_result(None) 

584 return fut 

585 

586 self._register_with_iocp(conn) 

587 # The socket needs to be locally bound before we call ConnectEx(). 

588 try: 

589 _overlapped.BindLocal(conn.fileno(), conn.family) 

590 except OSError as e: 

591 if e.winerror != errno.WSAEINVAL: 

592 raise 

593 # Probably already locally bound; check using getsockname(). 

594 if conn.getsockname()[1] == 0: 

595 raise 

596 ov = _overlapped.Overlapped(NULL) 

597 ov.ConnectEx(conn.fileno(), address) 

598 

599 def finish_connect(trans, key, ov): 

600 ov.getresult() 

601 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work. 

602 conn.setsockopt(socket.SOL_SOCKET, 

603 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0) 

604 return conn 

605 

606 return self._register(ov, conn, finish_connect) 

607 

608 def sendfile(self, sock, file, offset, count): 

609 self._register_with_iocp(sock) 

610 ov = _overlapped.Overlapped(NULL) 

611 offset_low = offset & 0xffff_ffff 

612 offset_high = (offset >> 32) & 0xffff_ffff 

613 ov.TransmitFile(sock.fileno(), 

614 msvcrt.get_osfhandle(file.fileno()), 

615 offset_low, offset_high, 

616 count, 0, 0) 

617 

618 return self._register(ov, sock, self.finish_socket_func) 

619 

620 def accept_pipe(self, pipe): 

621 self._register_with_iocp(pipe) 

622 ov = _overlapped.Overlapped(NULL) 

623 connected = ov.ConnectNamedPipe(pipe.fileno()) 

624 

625 if connected: 

626 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means 

627 # that the pipe is connected. There is no need to wait for the 

628 # completion of the connection. 

629 return self._result(pipe) 

630 

631 def finish_accept_pipe(trans, key, ov): 

632 ov.getresult() 

633 return pipe 

634 

635 return self._register(ov, pipe, finish_accept_pipe) 

636 

637 async def connect_pipe(self, address): 

638 delay = CONNECT_PIPE_INIT_DELAY 

639 while True: 

640 # Unfortunately there is no way to do an overlapped connect to 

641 # a pipe. Call CreateFile() in a loop until it doesn't fail with 

642 # ERROR_PIPE_BUSY. 

643 try: 

644 handle = _overlapped.ConnectPipe(address) 

645 break 

646 except OSError as exc: 

647 if exc.winerror != _overlapped.ERROR_PIPE_BUSY: 

648 raise 

649 

650 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later 

651 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY) 

652 await tasks.sleep(delay) 

653 

654 return windows_utils.PipeHandle(handle) 

655 

656 def wait_for_handle(self, handle, timeout=None): 

657 """Wait for a handle. 

658 

659 Return a Future object. The result of the future is True if the wait 

660 completed, or False if the wait did not complete (on timeout). 

661 """ 

662 return self._wait_for_handle(handle, timeout, False) 

663 

664 def _wait_cancel(self, event, done_callback): 

665 fut = self._wait_for_handle(event, None, True) 

666 # add_done_callback() cannot be used because the wait may only complete 

667 # in IocpProactor.close(), while the event loop is not running. 

668 fut._done_callback = done_callback 

669 return fut 

670 

671 def _wait_for_handle(self, handle, timeout, _is_cancel): 

672 self._check_closed() 

673 

674 if timeout is None: 

675 ms = _winapi.INFINITE 

676 else: 

677 # RegisterWaitForSingleObject() has a resolution of 1 millisecond, 

678 # round away from zero to wait *at least* timeout seconds. 

679 ms = math.ceil(timeout * 1e3) 

680 

681 # We only create ov so we can use ov.address as a key for the cache. 

682 ov = _overlapped.Overlapped(NULL) 

683 wait_handle = _overlapped.RegisterWaitWithQueue( 

684 handle, self._iocp, ov.address, ms) 

685 if _is_cancel: 

686 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop) 

687 else: 

688 f = _WaitHandleFuture(ov, handle, wait_handle, self, 

689 loop=self._loop) 

690 if f._source_traceback: 

691 del f._source_traceback[-1] 

692 

693 def finish_wait_for_handle(trans, key, ov): 

694 # Note that this second wait means that we should only use 

695 # this with handles types where a successful wait has no 

696 # effect. So events or processes are all right, but locks 

697 # or semaphores are not. Also note if the handle is 

698 # signalled and then quickly reset, then we may return 

699 # False even though we have not timed out. 

700 return f._poll() 

701 

702 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle) 

703 return f 

704 

705 def _register_with_iocp(self, obj): 

706 # To get notifications of finished ops on this objects sent to the 

707 # completion port, were must register the handle. 

708 if obj not in self._registered: 

709 self._registered.add(obj) 

710 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0) 

711 # XXX We could also use SetFileCompletionNotificationModes() 

712 # to avoid sending notifications to completion port of ops 

713 # that succeed immediately. 

714 

715 def _register(self, ov, obj, callback): 

716 self._check_closed() 

717 

718 # Return a future which will be set with the result of the 

719 # operation when it completes. The future's value is actually 

720 # the value returned by callback(). 

721 f = _OverlappedFuture(ov, loop=self._loop) 

722 if f._source_traceback: 

723 del f._source_traceback[-1] 

724 if not ov.pending: 

725 # The operation has completed, so no need to postpone the 

726 # work. We cannot take this short cut if we need the 

727 # NumberOfBytes, CompletionKey values returned by 

728 # PostQueuedCompletionStatus(). 

729 try: 

730 value = callback(None, None, ov) 

731 except OSError as e: 

732 f.set_exception(e) 

733 else: 

734 f.set_result(value) 

735 # Even if GetOverlappedResult() was called, we have to wait for the 

736 # notification of the completion in GetQueuedCompletionStatus(). 

737 # Register the overlapped operation to keep a reference to the 

738 # OVERLAPPED object, otherwise the memory is freed and Windows may 

739 # read uninitialized memory. 

740 

741 # Register the overlapped operation for later. Note that 

742 # we only store obj to prevent it from being garbage 

743 # collected too early. 

744 self._cache[ov.address] = (f, ov, obj, callback) 

745 return f 

746 

747 def _unregister(self, ov): 

748 """Unregister an overlapped object. 

749 

750 Call this method when its future has been cancelled. The event can 

751 already be signalled (pending in the proactor event queue). It is also 

752 safe if the event is never signalled (because it was cancelled). 

753 """ 

754 self._check_closed() 

755 self._unregistered.append(ov) 

756 

757 def _get_accept_socket(self, family): 

758 s = socket.socket(family) 

759 s.settimeout(0) 

760 return s 

761 

762 def _poll(self, timeout=None): 

763 if timeout is None: 

764 ms = INFINITE 

765 elif timeout < 0: 

766 raise ValueError("negative timeout") 

767 else: 

768 # GetQueuedCompletionStatus() has a resolution of 1 millisecond, 

769 # round away from zero to wait *at least* timeout seconds. 

770 ms = math.ceil(timeout * 1e3) 

771 if ms >= INFINITE: 

772 raise ValueError("timeout too big") 

773 

774 while True: 

775 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) 

776 if status is None: 

777 break 

778 ms = 0 

779 

780 err, transferred, key, address = status 

781 try: 

782 f, ov, obj, callback = self._cache.pop(address) 

783 except KeyError: 

784 if self._loop.get_debug(): 

785 self._loop.call_exception_handler({ 

786 'message': ('GetQueuedCompletionStatus() returned an ' 

787 'unexpected event'), 

788 'status': ('err=%s transferred=%s key=%#x address=%#x' 

789 % (err, transferred, key, address)), 

790 }) 

791 

792 # key is either zero, or it is used to return a pipe 

793 # handle which should be closed to avoid a leak. 

794 if key not in (0, _overlapped.INVALID_HANDLE_VALUE): 

795 _winapi.CloseHandle(key) 

796 continue 

797 

798 if obj in self._stopped_serving: 

799 f.cancel() 

800 # Don't call the callback if _register() already read the result or 

801 # if the overlapped has been cancelled 

802 elif not f.done(): 

803 try: 

804 value = callback(transferred, key, ov) 

805 except OSError as e: 

806 f.set_exception(e) 

807 self._results.append(f) 

808 else: 

809 f.set_result(value) 

810 self._results.append(f) 

811 finally: 

812 f = None 

813 

814 # Remove unregistered futures 

815 for ov in self._unregistered: 

816 self._cache.pop(ov.address, None) 

817 self._unregistered.clear() 

818 

819 def _stop_serving(self, obj): 

820 # obj is a socket or pipe handle. It will be closed in 

821 # BaseProactorEventLoop._stop_serving() which will make any 

822 # pending operations fail quickly. 

823 self._stopped_serving.add(obj) 

824 

825 def close(self): 

826 if self._iocp is None: 

827 # already closed 

828 return 

829 

830 # Cancel remaining registered operations. 

831 for fut, ov, obj, callback in list(self._cache.values()): 

832 if fut.cancelled(): 

833 # Nothing to do with cancelled futures 

834 pass 

835 elif isinstance(fut, _WaitCancelFuture): 

836 # _WaitCancelFuture must not be cancelled 

837 pass 

838 else: 

839 try: 

840 fut.cancel() 

841 except OSError as exc: 

842 if self._loop is not None: 

843 context = { 

844 'message': 'Cancelling a future failed', 

845 'exception': exc, 

846 'future': fut, 

847 } 

848 if fut._source_traceback: 

849 context['source_traceback'] = fut._source_traceback 

850 self._loop.call_exception_handler(context) 

851 

852 # Wait until all cancelled overlapped complete: don't exit with running 

853 # overlapped to prevent a crash. Display progress every second if the 

854 # loop is still running. 

855 msg_update = 1.0 

856 start_time = time.monotonic() 

857 next_msg = start_time + msg_update 

858 while self._cache: 

859 if next_msg <= time.monotonic(): 

860 logger.debug('%r is running after closing for %.1f seconds', 

861 self, time.monotonic() - start_time) 

862 next_msg = time.monotonic() + msg_update 

863 

864 # handle a few events, or timeout 

865 self._poll(msg_update) 

866 

867 self._results = [] 

868 

869 _winapi.CloseHandle(self._iocp) 

870 self._iocp = None 

871 

872 def __del__(self): 

873 self.close() 

874 

875 

876class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport): 

877 

878 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 

879 self._proc = windows_utils.Popen( 

880 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 

881 bufsize=bufsize, **kwargs) 

882 

883 def callback(f): 

884 returncode = self._proc.poll() 

885 self._process_exited(returncode) 

886 

887 f = self._loop._proactor.wait_for_handle(int(self._proc._handle)) 

888 f.add_done_callback(callback) 

889 

890 

891SelectorEventLoop = _WindowsSelectorEventLoop 

892 

893 

894class _WindowsSelectorEventLoopPolicy(events._BaseDefaultEventLoopPolicy): 

895 _loop_factory = SelectorEventLoop 

896 

897 

898class _WindowsProactorEventLoopPolicy(events._BaseDefaultEventLoopPolicy): 

899 _loop_factory = ProactorEventLoop 

900 

901 

902_DefaultEventLoopPolicy = _WindowsProactorEventLoopPolicy 

903EventLoop = ProactorEventLoop