Coverage for Lib/asyncio/windows_events.py: 0%

550 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-24 03:28 +0000

1"""Selector and proactor event loops for Windows.""" 

2 

3import sys 

4 

5if sys.platform != 'win32': # pragma: no cover 

6 raise ImportError('win32 only') 

7 

8import _overlapped 

9import _winapi 

10import errno 

11from functools import partial 

12import math 

13import msvcrt 

14import socket 

15import struct 

16import time 

17import weakref 

18 

19from . import events 

20from . import base_subprocess 

21from . import futures 

22from . import exceptions 

23from . import proactor_events 

24from . import selector_events 

25from . import tasks 

26from . import windows_utils 

27from .log import logger 

28 

29 

30__all__ = ( 

31 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor', 

32 '_DefaultEventLoopPolicy', '_WindowsSelectorEventLoopPolicy', 

33 '_WindowsProactorEventLoopPolicy', 'EventLoop', 

34) 

35 

36 

37NULL = _winapi.NULL 

38INFINITE = _winapi.INFINITE 

39ERROR_CONNECTION_REFUSED = 1225 

40ERROR_CONNECTION_ABORTED = 1236 

41 

42# Initial delay in seconds for connect_pipe() before retrying to connect 

43CONNECT_PIPE_INIT_DELAY = 0.001 

44 

45# Maximum delay in seconds for connect_pipe() before retrying to connect 

46CONNECT_PIPE_MAX_DELAY = 0.100 

47 

48 

49class _OverlappedFuture(futures.Future): 

50 """Subclass of Future which represents an overlapped operation. 

51 

52 Cancelling it will immediately cancel the overlapped operation. 

53 """ 

54 

55 def __init__(self, ov, *, loop=None): 

56 super().__init__(loop=loop) 

57 if self._source_traceback: 

58 del self._source_traceback[-1] 

59 self._ov = ov 

60 

61 def _repr_info(self): 

62 info = super()._repr_info() 

63 if self._ov is not None: 

64 state = 'pending' if self._ov.pending else 'completed' 

65 info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>') 

66 return info 

67 

68 def _cancel_overlapped(self): 

69 if self._ov is None: 

70 return 

71 try: 

72 self._ov.cancel() 

73 except OSError as exc: 

74 context = { 

75 'message': 'Cancelling an overlapped future failed', 

76 'exception': exc, 

77 'future': self, 

78 } 

79 if self._source_traceback: 

80 context['source_traceback'] = self._source_traceback 

81 self._loop.call_exception_handler(context) 

82 self._ov = None 

83 

84 def cancel(self, msg=None): 

85 self._cancel_overlapped() 

86 return super().cancel(msg=msg) 

87 

88 def set_exception(self, exception): 

89 super().set_exception(exception) 

90 self._cancel_overlapped() 

91 

92 def set_result(self, result): 

93 super().set_result(result) 

94 self._ov = None 

95 

96 

97class _BaseWaitHandleFuture(futures.Future): 

98 """Subclass of Future which represents a wait handle.""" 

99 

100 def __init__(self, ov, handle, wait_handle, *, loop=None): 

101 super().__init__(loop=loop) 

102 if self._source_traceback: 

103 del self._source_traceback[-1] 

104 # Keep a reference to the Overlapped object to keep it alive until the 

105 # wait is unregistered 

106 self._ov = ov 

107 self._handle = handle 

108 self._wait_handle = wait_handle 

109 

110 # Should we call UnregisterWaitEx() if the wait completes 

111 # or is cancelled? 

112 self._registered = True 

113 

114 def _poll(self): 

115 # non-blocking wait: use a timeout of 0 millisecond 

116 return (_winapi.WaitForSingleObject(self._handle, 0) == 

117 _winapi.WAIT_OBJECT_0) 

118 

119 def _repr_info(self): 

120 info = super()._repr_info() 

121 info.append(f'handle={self._handle:#x}') 

122 if self._handle is not None: 

123 state = 'signaled' if self._poll() else 'waiting' 

124 info.append(state) 

125 if self._wait_handle is not None: 

126 info.append(f'wait_handle={self._wait_handle:#x}') 

127 return info 

128 

129 def _unregister_wait_cb(self, fut): 

130 # The wait was unregistered: it's not safe to destroy the Overlapped 

131 # object 

132 self._ov = None 

133 

134 def _unregister_wait(self): 

135 if not self._registered: 

136 return 

137 self._registered = False 

138 

139 wait_handle = self._wait_handle 

140 self._wait_handle = None 

141 try: 

142 _overlapped.UnregisterWait(wait_handle) 

143 except OSError as exc: 

144 if exc.winerror != _overlapped.ERROR_IO_PENDING: 

145 context = { 

146 'message': 'Failed to unregister the wait handle', 

147 'exception': exc, 

148 'future': self, 

149 } 

150 if self._source_traceback: 

151 context['source_traceback'] = self._source_traceback 

152 self._loop.call_exception_handler(context) 

153 return 

154 # ERROR_IO_PENDING means that the unregister is pending 

155 

156 self._unregister_wait_cb(None) 

157 

158 def cancel(self, msg=None): 

159 self._unregister_wait() 

160 return super().cancel(msg=msg) 

161 

162 def set_exception(self, exception): 

163 self._unregister_wait() 

164 super().set_exception(exception) 

165 

166 def set_result(self, result): 

167 self._unregister_wait() 

168 super().set_result(result) 

169 

170 

171class _WaitCancelFuture(_BaseWaitHandleFuture): 

172 """Subclass of Future which represents a wait for the cancellation of a 

173 _WaitHandleFuture using an event. 

174 """ 

175 

176 def __init__(self, ov, event, wait_handle, *, loop=None): 

177 super().__init__(ov, event, wait_handle, loop=loop) 

178 

179 self._done_callback = None 

180 

181 def cancel(self): 

182 raise RuntimeError("_WaitCancelFuture must not be cancelled") 

183 

184 def set_result(self, result): 

185 super().set_result(result) 

186 if self._done_callback is not None: 

187 self._done_callback(self) 

188 

189 def set_exception(self, exception): 

190 super().set_exception(exception) 

191 if self._done_callback is not None: 

192 self._done_callback(self) 

193 

194 

195class _WaitHandleFuture(_BaseWaitHandleFuture): 

196 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None): 

197 super().__init__(ov, handle, wait_handle, loop=loop) 

198 self._proactor = proactor 

199 self._unregister_proactor = True 

200 self._event = _overlapped.CreateEvent(None, True, False, None) 

201 self._event_fut = None 

202 

203 def _unregister_wait_cb(self, fut): 

204 if self._event is not None: 

205 _winapi.CloseHandle(self._event) 

206 self._event = None 

207 self._event_fut = None 

208 

209 # If the wait was cancelled, the wait may never be signalled, so 

210 # it's required to unregister it. Otherwise, IocpProactor.close() will 

211 # wait forever for an event which will never come. 

212 # 

213 # If the IocpProactor already received the event, it's safe to call 

214 # _unregister() because we kept a reference to the Overlapped object 

215 # which is used as a unique key. 

216 self._proactor._unregister(self._ov) 

217 self._proactor = None 

218 

219 super()._unregister_wait_cb(fut) 

220 

221 def _unregister_wait(self): 

222 if not self._registered: 

223 return 

224 self._registered = False 

225 

226 wait_handle = self._wait_handle 

227 self._wait_handle = None 

228 try: 

229 _overlapped.UnregisterWaitEx(wait_handle, self._event) 

230 except OSError as exc: 

231 if exc.winerror != _overlapped.ERROR_IO_PENDING: 

232 context = { 

233 'message': 'Failed to unregister the wait handle', 

234 'exception': exc, 

235 'future': self, 

236 } 

237 if self._source_traceback: 

238 context['source_traceback'] = self._source_traceback 

239 self._loop.call_exception_handler(context) 

240 return 

241 # ERROR_IO_PENDING is not an error, the wait was unregistered 

242 

243 self._event_fut = self._proactor._wait_cancel(self._event, 

244 self._unregister_wait_cb) 

245 

246 

247class PipeServer(object): 

248 """Class representing a pipe server. 

249 

250 This is much like a bound, listening socket. 

251 """ 

252 def __init__(self, address): 

253 self._address = address 

254 self._free_instances = weakref.WeakSet() 

255 # initialize the pipe attribute before calling _server_pipe_handle() 

256 # because this function can raise an exception and the destructor calls 

257 # the close() method 

258 self._pipe = None 

259 self._accept_pipe_future = None 

260 self._pipe = self._server_pipe_handle(True) 

261 

262 def _get_unconnected_pipe(self): 

263 # Create new instance and return previous one. This ensures 

264 # that (until the server is closed) there is always at least 

265 # one pipe handle for address. Therefore if a client attempt 

266 # to connect it will not fail with FileNotFoundError. 

267 tmp, self._pipe = self._pipe, self._server_pipe_handle(False) 

268 return tmp 

269 

270 def _server_pipe_handle(self, first): 

271 # Return a wrapper for a new pipe handle. 

272 if self.closed(): 

273 return None 

274 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 

275 if first: 

276 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 

277 h = _winapi.CreateNamedPipe( 

278 self._address, flags, 

279 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 

280 _winapi.PIPE_WAIT, 

281 _winapi.PIPE_UNLIMITED_INSTANCES, 

282 windows_utils.BUFSIZE, windows_utils.BUFSIZE, 

283 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL) 

284 pipe = windows_utils.PipeHandle(h) 

285 self._free_instances.add(pipe) 

286 return pipe 

287 

288 def closed(self): 

289 return (self._address is None) 

290 

291 def close(self): 

292 if self._accept_pipe_future is not None: 

293 self._accept_pipe_future.cancel() 

294 self._accept_pipe_future = None 

295 # Close all instances which have not been connected to by a client. 

296 if self._address is not None: 

297 for pipe in self._free_instances: 

298 pipe.close() 

299 self._pipe = None 

300 self._address = None 

301 self._free_instances.clear() 

302 

303 __del__ = close 

304 

305 

306class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop): 

307 """Windows version of selector event loop.""" 

308 

309 

310class ProactorEventLoop(proactor_events.BaseProactorEventLoop): 

311 """Windows version of proactor event loop using IOCP.""" 

312 

313 def __init__(self, proactor=None): 

314 if proactor is None: 

315 proactor = IocpProactor() 

316 super().__init__(proactor) 

317 

318 def _run_forever_setup(self): 

319 assert self._self_reading_future is None 

320 self.call_soon(self._loop_self_reading) 

321 super()._run_forever_setup() 

322 

323 def _run_forever_cleanup(self): 

324 super()._run_forever_cleanup() 

325 if self._self_reading_future is not None: 

326 ov = self._self_reading_future._ov 

327 self._self_reading_future.cancel() 

328 # self_reading_future always uses IOCP, so even though it's 

329 # been cancelled, we need to make sure that the IOCP message 

330 # is received so that the kernel is not holding on to the 

331 # memory, possibly causing memory corruption later. Only 

332 # unregister it if IO is complete in all respects. Otherwise 

333 # we need another _poll() later to complete the IO. 

334 if ov is not None and not ov.pending: 

335 self._proactor._unregister(ov) 

336 self._self_reading_future = None 

337 

338 async def create_pipe_connection(self, protocol_factory, address): 

339 f = self._proactor.connect_pipe(address) 

340 pipe = await f 

341 protocol = protocol_factory() 

342 trans = self._make_duplex_pipe_transport(pipe, protocol, 

343 extra={'addr': address}) 

344 return trans, protocol 

345 

346 async def start_serving_pipe(self, protocol_factory, address): 

347 server = PipeServer(address) 

348 

349 def loop_accept_pipe(f=None): 

350 pipe = None 

351 try: 

352 if f: 

353 pipe = f.result() 

354 server._free_instances.discard(pipe) 

355 

356 if server.closed(): 

357 # A client connected before the server was closed: 

358 # drop the client (close the pipe) and exit 

359 pipe.close() 

360 return 

361 

362 protocol = protocol_factory() 

363 self._make_duplex_pipe_transport( 

364 pipe, protocol, extra={'addr': address}) 

365 

366 pipe = server._get_unconnected_pipe() 

367 if pipe is None: 

368 return 

369 

370 f = self._proactor.accept_pipe(pipe) 

371 except BrokenPipeError: 

372 if pipe and pipe.fileno() != -1: 

373 pipe.close() 

374 self.call_soon(loop_accept_pipe) 

375 except OSError as exc: 

376 if pipe and pipe.fileno() != -1: 

377 self.call_exception_handler({ 

378 'message': 'Pipe accept failed', 

379 'exception': exc, 

380 'pipe': pipe, 

381 }) 

382 pipe.close() 

383 elif self._debug: 

384 logger.warning("Accept pipe failed on pipe %r", 

385 pipe, exc_info=True) 

386 self.call_soon(loop_accept_pipe) 

387 except exceptions.CancelledError: 

388 if pipe: 

389 pipe.close() 

390 else: 

391 server._accept_pipe_future = f 

392 f.add_done_callback(loop_accept_pipe) 

393 

394 self.call_soon(loop_accept_pipe) 

395 return [server] 

396 

397 async def _make_subprocess_transport(self, protocol, args, shell, 

398 stdin, stdout, stderr, bufsize, 

399 extra=None, **kwargs): 

400 waiter = self.create_future() 

401 transp = _WindowsSubprocessTransport(self, protocol, args, shell, 

402 stdin, stdout, stderr, bufsize, 

403 waiter=waiter, extra=extra, 

404 **kwargs) 

405 try: 

406 await waiter 

407 except (SystemExit, KeyboardInterrupt): 

408 raise 

409 except BaseException: 

410 transp.close() 

411 await transp._wait() 

412 raise 

413 

414 return transp 

415 

416 

417class IocpProactor: 

418 """Proactor implementation using IOCP.""" 

419 

420 def __init__(self, concurrency=INFINITE): 

421 self._loop = None 

422 self._results = [] 

423 self._iocp = _overlapped.CreateIoCompletionPort( 

424 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency) 

425 self._cache = {} 

426 self._registered = weakref.WeakSet() 

427 self._unregistered = [] 

428 self._stopped_serving = weakref.WeakSet() 

429 

430 def _check_closed(self): 

431 if self._iocp is None: 

432 raise RuntimeError('IocpProactor is closed') 

433 

434 def __repr__(self): 

435 info = ['overlapped#=%s' % len(self._cache), 

436 'result#=%s' % len(self._results)] 

437 if self._iocp is None: 

438 info.append('closed') 

439 return '<%s %s>' % (self.__class__.__name__, " ".join(info)) 

440 

441 def set_loop(self, loop): 

442 self._loop = loop 

443 

444 def select(self, timeout=None): 

445 if not self._results: 

446 self._poll(timeout) 

447 tmp = self._results 

448 self._results = [] 

449 try: 

450 return tmp 

451 finally: 

452 # Needed to break cycles when an exception occurs. 

453 tmp = None 

454 

455 def _result(self, value): 

456 fut = self._loop.create_future() 

457 fut.set_result(value) 

458 return fut 

459 

460 @staticmethod 

461 def finish_socket_func(trans, key, ov): 

462 try: 

463 return ov.getresult() 

464 except OSError as exc: 

465 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 

466 _overlapped.ERROR_OPERATION_ABORTED): 

467 raise ConnectionResetError(*exc.args) 

468 else: 

469 raise 

470 

471 @classmethod 

472 def _finish_recvfrom(cls, trans, key, ov, *, empty_result): 

473 try: 

474 return cls.finish_socket_func(trans, key, ov) 

475 except OSError as exc: 

476 # WSARecvFrom will report ERROR_PORT_UNREACHABLE when the same 

477 # socket is used to send to an address that is not listening. 

478 if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE: 

479 return empty_result, None 

480 else: 

481 raise 

482 

483 def recv(self, conn, nbytes, flags=0): 

484 self._register_with_iocp(conn) 

485 ov = _overlapped.Overlapped(NULL) 

486 try: 

487 if isinstance(conn, socket.socket): 

488 ov.WSARecv(conn.fileno(), nbytes, flags) 

489 else: 

490 ov.ReadFile(conn.fileno(), nbytes) 

491 except BrokenPipeError: 

492 return self._result(b'') 

493 

494 return self._register(ov, conn, self.finish_socket_func) 

495 

496 def recv_into(self, conn, buf, flags=0): 

497 self._register_with_iocp(conn) 

498 ov = _overlapped.Overlapped(NULL) 

499 try: 

500 if isinstance(conn, socket.socket): 

501 ov.WSARecvInto(conn.fileno(), buf, flags) 

502 else: 

503 ov.ReadFileInto(conn.fileno(), buf) 

504 except BrokenPipeError: 

505 return self._result(0) 

506 

507 return self._register(ov, conn, self.finish_socket_func) 

508 

509 def recvfrom(self, conn, nbytes, flags=0): 

510 self._register_with_iocp(conn) 

511 ov = _overlapped.Overlapped(NULL) 

512 try: 

513 ov.WSARecvFrom(conn.fileno(), nbytes, flags) 

514 except BrokenPipeError: 

515 return self._result((b'', None)) 

516 

517 return self._register(ov, conn, partial(self._finish_recvfrom, 

518 empty_result=b'')) 

519 

520 def recvfrom_into(self, conn, buf, flags=0): 

521 self._register_with_iocp(conn) 

522 ov = _overlapped.Overlapped(NULL) 

523 try: 

524 ov.WSARecvFromInto(conn.fileno(), buf, flags) 

525 except BrokenPipeError: 

526 return self._result((0, None)) 

527 

528 return self._register(ov, conn, partial(self._finish_recvfrom, 

529 empty_result=0)) 

530 

531 def sendto(self, conn, buf, flags=0, addr=None): 

532 self._register_with_iocp(conn) 

533 ov = _overlapped.Overlapped(NULL) 

534 

535 ov.WSASendTo(conn.fileno(), buf, flags, addr) 

536 

537 return self._register(ov, conn, self.finish_socket_func) 

538 

539 def send(self, conn, buf, flags=0): 

540 self._register_with_iocp(conn) 

541 ov = _overlapped.Overlapped(NULL) 

542 if isinstance(conn, socket.socket): 

543 ov.WSASend(conn.fileno(), buf, flags) 

544 else: 

545 ov.WriteFile(conn.fileno(), buf) 

546 

547 return self._register(ov, conn, self.finish_socket_func) 

548 

549 def accept(self, listener): 

550 self._register_with_iocp(listener) 

551 conn = self._get_accept_socket(listener.family) 

552 ov = _overlapped.Overlapped(NULL) 

553 ov.AcceptEx(listener.fileno(), conn.fileno()) 

554 

555 def finish_accept(trans, key, ov): 

556 ov.getresult() 

557 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work. 

558 buf = struct.pack('@P', listener.fileno()) 

559 conn.setsockopt(socket.SOL_SOCKET, 

560 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf) 

561 conn.settimeout(listener.gettimeout()) 

562 return conn, conn.getpeername() 

563 

564 async def accept_coro(future, conn): 

565 # Coroutine closing the accept socket if the future is cancelled 

566 try: 

567 await future 

568 except exceptions.CancelledError: 

569 conn.close() 

570 raise 

571 

572 future = self._register(ov, listener, finish_accept) 

573 coro = accept_coro(future, conn) 

574 tasks.ensure_future(coro, loop=self._loop) 

575 return future 

576 

577 def connect(self, conn, address): 

578 if conn.type == socket.SOCK_DGRAM: 

579 # WSAConnect will complete immediately for UDP sockets so we don't 

580 # need to register any IOCP operation 

581 _overlapped.WSAConnect(conn.fileno(), address) 

582 fut = self._loop.create_future() 

583 fut.set_result(None) 

584 return fut 

585 

586 self._register_with_iocp(conn) 

587 # The socket needs to be locally bound before we call ConnectEx(). 

588 try: 

589 _overlapped.BindLocal(conn.fileno(), conn.family) 

590 except OSError as e: 

591 if e.winerror != errno.WSAEINVAL: 

592 raise 

593 # Probably already locally bound; check using getsockname(). 

594 if conn.getsockname()[1] == 0: 

595 raise 

596 ov = _overlapped.Overlapped(NULL) 

597 ov.ConnectEx(conn.fileno(), address) 

598 

599 def finish_connect(trans, key, ov): 

600 ov.getresult() 

601 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work. 

602 conn.setsockopt(socket.SOL_SOCKET, 

603 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0) 

604 return conn 

605 

606 return self._register(ov, conn, finish_connect) 

607 

608 def sendfile(self, sock, file, offset, count): 

609 self._register_with_iocp(sock) 

610 ov = _overlapped.Overlapped(NULL) 

611 offset_low = offset & 0xffff_ffff 

612 offset_high = (offset >> 32) & 0xffff_ffff 

613 # TransmitFile ignores OVERLAPPED.Offset for handles not opened with 

614 # FILE_FLAG_OVERLAPPED, so seek the CRT file pointer to match. 

615 file.seek(offset) 

616 ov.TransmitFile(sock.fileno(), 

617 msvcrt.get_osfhandle(file.fileno()), 

618 offset_low, offset_high, 

619 count, 0, 0) 

620 

621 return self._register(ov, sock, self.finish_socket_func) 

622 

623 def accept_pipe(self, pipe): 

624 self._register_with_iocp(pipe) 

625 ov = _overlapped.Overlapped(NULL) 

626 connected = ov.ConnectNamedPipe(pipe.fileno()) 

627 

628 if connected: 

629 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means 

630 # that the pipe is connected. There is no need to wait for the 

631 # completion of the connection. 

632 return self._result(pipe) 

633 

634 def finish_accept_pipe(trans, key, ov): 

635 ov.getresult() 

636 return pipe 

637 

638 return self._register(ov, pipe, finish_accept_pipe) 

639 

640 async def connect_pipe(self, address): 

641 delay = CONNECT_PIPE_INIT_DELAY 

642 while True: 

643 # Unfortunately there is no way to do an overlapped connect to 

644 # a pipe. Call CreateFile() in a loop until it doesn't fail with 

645 # ERROR_PIPE_BUSY. 

646 try: 

647 handle = _overlapped.ConnectPipe(address) 

648 break 

649 except OSError as exc: 

650 if exc.winerror != _overlapped.ERROR_PIPE_BUSY: 

651 raise 

652 

653 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later 

654 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY) 

655 await tasks.sleep(delay) 

656 

657 return windows_utils.PipeHandle(handle) 

658 

659 def wait_for_handle(self, handle, timeout=None): 

660 """Wait for a handle. 

661 

662 Return a Future object. The result of the future is True if the wait 

663 completed, or False if the wait did not complete (on timeout). 

664 """ 

665 return self._wait_for_handle(handle, timeout, False) 

666 

667 def _wait_cancel(self, event, done_callback): 

668 fut = self._wait_for_handle(event, None, True) 

669 # add_done_callback() cannot be used because the wait may only complete 

670 # in IocpProactor.close(), while the event loop is not running. 

671 fut._done_callback = done_callback 

672 return fut 

673 

674 def _wait_for_handle(self, handle, timeout, _is_cancel): 

675 self._check_closed() 

676 

677 if timeout is None: 

678 ms = _winapi.INFINITE 

679 else: 

680 # RegisterWaitForSingleObject() has a resolution of 1 millisecond, 

681 # round away from zero to wait *at least* timeout seconds. 

682 ms = math.ceil(timeout * 1e3) 

683 

684 # We only create ov so we can use ov.address as a key for the cache. 

685 ov = _overlapped.Overlapped(NULL) 

686 wait_handle = _overlapped.RegisterWaitWithQueue( 

687 handle, self._iocp, ov.address, ms) 

688 if _is_cancel: 

689 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop) 

690 else: 

691 f = _WaitHandleFuture(ov, handle, wait_handle, self, 

692 loop=self._loop) 

693 if f._source_traceback: 

694 del f._source_traceback[-1] 

695 

696 def finish_wait_for_handle(trans, key, ov): 

697 # Note that this second wait means that we should only use 

698 # this with handles types where a successful wait has no 

699 # effect. So events or processes are all right, but locks 

700 # or semaphores are not. Also note if the handle is 

701 # signalled and then quickly reset, then we may return 

702 # False even though we have not timed out. 

703 return f._poll() 

704 

705 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle) 

706 return f 

707 

708 def _register_with_iocp(self, obj): 

709 # To get notifications of finished ops on this objects sent to the 

710 # completion port, were must register the handle. 

711 if obj not in self._registered: 

712 self._registered.add(obj) 

713 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0) 

714 # XXX We could also use SetFileCompletionNotificationModes() 

715 # to avoid sending notifications to completion port of ops 

716 # that succeed immediately. 

717 

718 def _register(self, ov, obj, callback): 

719 self._check_closed() 

720 

721 # Return a future which will be set with the result of the 

722 # operation when it completes. The future's value is actually 

723 # the value returned by callback(). 

724 f = _OverlappedFuture(ov, loop=self._loop) 

725 if f._source_traceback: 

726 del f._source_traceback[-1] 

727 if not ov.pending: 

728 # The operation has completed, so no need to postpone the 

729 # work. We cannot take this short cut if we need the 

730 # NumberOfBytes, CompletionKey values returned by 

731 # PostQueuedCompletionStatus(). 

732 try: 

733 value = callback(None, None, ov) 

734 except OSError as e: 

735 f.set_exception(e) 

736 else: 

737 f.set_result(value) 

738 # Even if GetOverlappedResult() was called, we have to wait for the 

739 # notification of the completion in GetQueuedCompletionStatus(). 

740 # Register the overlapped operation to keep a reference to the 

741 # OVERLAPPED object, otherwise the memory is freed and Windows may 

742 # read uninitialized memory. 

743 

744 # Register the overlapped operation for later. Note that 

745 # we only store obj to prevent it from being garbage 

746 # collected too early. 

747 self._cache[ov.address] = (f, ov, obj, callback) 

748 return f 

749 

750 def _unregister(self, ov): 

751 """Unregister an overlapped object. 

752 

753 Call this method when its future has been cancelled. The event can 

754 already be signalled (pending in the proactor event queue). It is also 

755 safe if the event is never signalled (because it was cancelled). 

756 """ 

757 self._check_closed() 

758 self._unregistered.append(ov) 

759 

760 def _get_accept_socket(self, family): 

761 s = socket.socket(family) 

762 s.settimeout(0) 

763 return s 

764 

765 def _poll(self, timeout=None): 

766 if timeout is None: 

767 ms = INFINITE 

768 elif timeout < 0: 

769 raise ValueError("negative timeout") 

770 else: 

771 # GetQueuedCompletionStatus() has a resolution of 1 millisecond, 

772 # round away from zero to wait *at least* timeout seconds. 

773 ms = math.ceil(timeout * 1e3) 

774 if ms >= INFINITE: 

775 raise ValueError("timeout too big") 

776 

777 while True: 

778 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) 

779 if status is None: 

780 break 

781 ms = 0 

782 

783 err, transferred, key, address = status 

784 try: 

785 f, ov, obj, callback = self._cache.pop(address) 

786 except KeyError: 

787 if self._loop.get_debug(): 

788 self._loop.call_exception_handler({ 

789 'message': ('GetQueuedCompletionStatus() returned an ' 

790 'unexpected event'), 

791 'status': ('err=%s transferred=%s key=%#x address=%#x' 

792 % (err, transferred, key, address)), 

793 }) 

794 

795 # key is either zero, or it is used to return a pipe 

796 # handle which should be closed to avoid a leak. 

797 if key not in (0, _overlapped.INVALID_HANDLE_VALUE): 

798 _winapi.CloseHandle(key) 

799 continue 

800 

801 if obj in self._stopped_serving: 

802 f.cancel() 

803 # Don't call the callback if _register() already read the result or 

804 # if the overlapped has been cancelled 

805 elif not f.done(): 

806 try: 

807 value = callback(transferred, key, ov) 

808 except OSError as e: 

809 f.set_exception(e) 

810 self._results.append(f) 

811 else: 

812 f.set_result(value) 

813 self._results.append(f) 

814 finally: 

815 f = None 

816 

817 # Remove unregistered futures 

818 for ov in self._unregistered: 

819 self._cache.pop(ov.address, None) 

820 self._unregistered.clear() 

821 

822 def _stop_serving(self, obj): 

823 # obj is a socket or pipe handle. It will be closed in 

824 # BaseProactorEventLoop._stop_serving() which will make any 

825 # pending operations fail quickly. 

826 self._stopped_serving.add(obj) 

827 

828 def close(self): 

829 if self._iocp is None: 

830 # already closed 

831 return 

832 

833 # Cancel remaining registered operations. 

834 for fut, ov, obj, callback in list(self._cache.values()): 

835 if fut.cancelled(): 

836 # Nothing to do with cancelled futures 

837 pass 

838 elif isinstance(fut, _WaitCancelFuture): 

839 # _WaitCancelFuture must not be cancelled 

840 pass 

841 else: 

842 try: 

843 fut.cancel() 

844 except OSError as exc: 

845 if self._loop is not None: 

846 context = { 

847 'message': 'Cancelling a future failed', 

848 'exception': exc, 

849 'future': fut, 

850 } 

851 if fut._source_traceback: 

852 context['source_traceback'] = fut._source_traceback 

853 self._loop.call_exception_handler(context) 

854 

855 # Wait until all cancelled overlapped complete: don't exit with running 

856 # overlapped to prevent a crash. Display progress every second if the 

857 # loop is still running. 

858 msg_update = 1.0 

859 start_time = time.monotonic() 

860 next_msg = start_time + msg_update 

861 while self._cache: 

862 if next_msg <= time.monotonic(): 

863 logger.debug('%r is running after closing for %.1f seconds', 

864 self, time.monotonic() - start_time) 

865 next_msg = time.monotonic() + msg_update 

866 

867 # handle a few events, or timeout 

868 self._poll(msg_update) 

869 

870 self._results = [] 

871 

872 _winapi.CloseHandle(self._iocp) 

873 self._iocp = None 

874 

875 def __del__(self): 

876 self.close() 

877 

878 

879class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport): 

880 

881 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 

882 self._proc = windows_utils.Popen( 

883 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 

884 bufsize=bufsize, **kwargs) 

885 

886 def callback(f): 

887 returncode = self._proc.poll() 

888 self._process_exited(returncode) 

889 

890 f = self._loop._proactor.wait_for_handle(int(self._proc._handle)) 

891 f.add_done_callback(callback) 

892 

893 

894SelectorEventLoop = _WindowsSelectorEventLoop 

895 

896 

897class _WindowsSelectorEventLoopPolicy(events._BaseDefaultEventLoopPolicy): 

898 _loop_factory = SelectorEventLoop 

899 

900 

901class _WindowsProactorEventLoopPolicy(events._BaseDefaultEventLoopPolicy): 

902 _loop_factory = ProactorEventLoop 

903 

904 

905_DefaultEventLoopPolicy = _WindowsProactorEventLoopPolicy 

906EventLoop = ProactorEventLoop