Coverage for Lib/asyncio/windows_events.py: 0%
550 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:28 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:28 +0000
1"""Selector and proactor event loops for Windows."""
3import sys
5if sys.platform != 'win32': # pragma: no cover
6 raise ImportError('win32 only')
8import _overlapped
9import _winapi
10import errno
11from functools import partial
12import math
13import msvcrt
14import socket
15import struct
16import time
17import weakref
19from . import events
20from . import base_subprocess
21from . import futures
22from . import exceptions
23from . import proactor_events
24from . import selector_events
25from . import tasks
26from . import windows_utils
27from .log import logger
30__all__ = (
31 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
32 '_DefaultEventLoopPolicy', '_WindowsSelectorEventLoopPolicy',
33 '_WindowsProactorEventLoopPolicy', 'EventLoop',
34)
37NULL = _winapi.NULL
38INFINITE = _winapi.INFINITE
39ERROR_CONNECTION_REFUSED = 1225
40ERROR_CONNECTION_ABORTED = 1236
42# Initial delay in seconds for connect_pipe() before retrying to connect
43CONNECT_PIPE_INIT_DELAY = 0.001
45# Maximum delay in seconds for connect_pipe() before retrying to connect
46CONNECT_PIPE_MAX_DELAY = 0.100
49class _OverlappedFuture(futures.Future):
50 """Subclass of Future which represents an overlapped operation.
52 Cancelling it will immediately cancel the overlapped operation.
53 """
55 def __init__(self, ov, *, loop=None):
56 super().__init__(loop=loop)
57 if self._source_traceback:
58 del self._source_traceback[-1]
59 self._ov = ov
61 def _repr_info(self):
62 info = super()._repr_info()
63 if self._ov is not None:
64 state = 'pending' if self._ov.pending else 'completed'
65 info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
66 return info
68 def _cancel_overlapped(self):
69 if self._ov is None:
70 return
71 try:
72 self._ov.cancel()
73 except OSError as exc:
74 context = {
75 'message': 'Cancelling an overlapped future failed',
76 'exception': exc,
77 'future': self,
78 }
79 if self._source_traceback:
80 context['source_traceback'] = self._source_traceback
81 self._loop.call_exception_handler(context)
82 self._ov = None
84 def cancel(self, msg=None):
85 self._cancel_overlapped()
86 return super().cancel(msg=msg)
88 def set_exception(self, exception):
89 super().set_exception(exception)
90 self._cancel_overlapped()
92 def set_result(self, result):
93 super().set_result(result)
94 self._ov = None
97class _BaseWaitHandleFuture(futures.Future):
98 """Subclass of Future which represents a wait handle."""
100 def __init__(self, ov, handle, wait_handle, *, loop=None):
101 super().__init__(loop=loop)
102 if self._source_traceback:
103 del self._source_traceback[-1]
104 # Keep a reference to the Overlapped object to keep it alive until the
105 # wait is unregistered
106 self._ov = ov
107 self._handle = handle
108 self._wait_handle = wait_handle
110 # Should we call UnregisterWaitEx() if the wait completes
111 # or is cancelled?
112 self._registered = True
114 def _poll(self):
115 # non-blocking wait: use a timeout of 0 millisecond
116 return (_winapi.WaitForSingleObject(self._handle, 0) ==
117 _winapi.WAIT_OBJECT_0)
119 def _repr_info(self):
120 info = super()._repr_info()
121 info.append(f'handle={self._handle:#x}')
122 if self._handle is not None:
123 state = 'signaled' if self._poll() else 'waiting'
124 info.append(state)
125 if self._wait_handle is not None:
126 info.append(f'wait_handle={self._wait_handle:#x}')
127 return info
129 def _unregister_wait_cb(self, fut):
130 # The wait was unregistered: it's not safe to destroy the Overlapped
131 # object
132 self._ov = None
134 def _unregister_wait(self):
135 if not self._registered:
136 return
137 self._registered = False
139 wait_handle = self._wait_handle
140 self._wait_handle = None
141 try:
142 _overlapped.UnregisterWait(wait_handle)
143 except OSError as exc:
144 if exc.winerror != _overlapped.ERROR_IO_PENDING:
145 context = {
146 'message': 'Failed to unregister the wait handle',
147 'exception': exc,
148 'future': self,
149 }
150 if self._source_traceback:
151 context['source_traceback'] = self._source_traceback
152 self._loop.call_exception_handler(context)
153 return
154 # ERROR_IO_PENDING means that the unregister is pending
156 self._unregister_wait_cb(None)
158 def cancel(self, msg=None):
159 self._unregister_wait()
160 return super().cancel(msg=msg)
162 def set_exception(self, exception):
163 self._unregister_wait()
164 super().set_exception(exception)
166 def set_result(self, result):
167 self._unregister_wait()
168 super().set_result(result)
171class _WaitCancelFuture(_BaseWaitHandleFuture):
172 """Subclass of Future which represents a wait for the cancellation of a
173 _WaitHandleFuture using an event.
174 """
176 def __init__(self, ov, event, wait_handle, *, loop=None):
177 super().__init__(ov, event, wait_handle, loop=loop)
179 self._done_callback = None
181 def cancel(self):
182 raise RuntimeError("_WaitCancelFuture must not be cancelled")
184 def set_result(self, result):
185 super().set_result(result)
186 if self._done_callback is not None:
187 self._done_callback(self)
189 def set_exception(self, exception):
190 super().set_exception(exception)
191 if self._done_callback is not None:
192 self._done_callback(self)
195class _WaitHandleFuture(_BaseWaitHandleFuture):
196 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
197 super().__init__(ov, handle, wait_handle, loop=loop)
198 self._proactor = proactor
199 self._unregister_proactor = True
200 self._event = _overlapped.CreateEvent(None, True, False, None)
201 self._event_fut = None
203 def _unregister_wait_cb(self, fut):
204 if self._event is not None:
205 _winapi.CloseHandle(self._event)
206 self._event = None
207 self._event_fut = None
209 # If the wait was cancelled, the wait may never be signalled, so
210 # it's required to unregister it. Otherwise, IocpProactor.close() will
211 # wait forever for an event which will never come.
212 #
213 # If the IocpProactor already received the event, it's safe to call
214 # _unregister() because we kept a reference to the Overlapped object
215 # which is used as a unique key.
216 self._proactor._unregister(self._ov)
217 self._proactor = None
219 super()._unregister_wait_cb(fut)
221 def _unregister_wait(self):
222 if not self._registered:
223 return
224 self._registered = False
226 wait_handle = self._wait_handle
227 self._wait_handle = None
228 try:
229 _overlapped.UnregisterWaitEx(wait_handle, self._event)
230 except OSError as exc:
231 if exc.winerror != _overlapped.ERROR_IO_PENDING:
232 context = {
233 'message': 'Failed to unregister the wait handle',
234 'exception': exc,
235 'future': self,
236 }
237 if self._source_traceback:
238 context['source_traceback'] = self._source_traceback
239 self._loop.call_exception_handler(context)
240 return
241 # ERROR_IO_PENDING is not an error, the wait was unregistered
243 self._event_fut = self._proactor._wait_cancel(self._event,
244 self._unregister_wait_cb)
247class PipeServer(object):
248 """Class representing a pipe server.
250 This is much like a bound, listening socket.
251 """
252 def __init__(self, address):
253 self._address = address
254 self._free_instances = weakref.WeakSet()
255 # initialize the pipe attribute before calling _server_pipe_handle()
256 # because this function can raise an exception and the destructor calls
257 # the close() method
258 self._pipe = None
259 self._accept_pipe_future = None
260 self._pipe = self._server_pipe_handle(True)
262 def _get_unconnected_pipe(self):
263 # Create new instance and return previous one. This ensures
264 # that (until the server is closed) there is always at least
265 # one pipe handle for address. Therefore if a client attempt
266 # to connect it will not fail with FileNotFoundError.
267 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
268 return tmp
270 def _server_pipe_handle(self, first):
271 # Return a wrapper for a new pipe handle.
272 if self.closed():
273 return None
274 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
275 if first:
276 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
277 h = _winapi.CreateNamedPipe(
278 self._address, flags,
279 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
280 _winapi.PIPE_WAIT,
281 _winapi.PIPE_UNLIMITED_INSTANCES,
282 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
283 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
284 pipe = windows_utils.PipeHandle(h)
285 self._free_instances.add(pipe)
286 return pipe
288 def closed(self):
289 return (self._address is None)
291 def close(self):
292 if self._accept_pipe_future is not None:
293 self._accept_pipe_future.cancel()
294 self._accept_pipe_future = None
295 # Close all instances which have not been connected to by a client.
296 if self._address is not None:
297 for pipe in self._free_instances:
298 pipe.close()
299 self._pipe = None
300 self._address = None
301 self._free_instances.clear()
303 __del__ = close
306class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
307 """Windows version of selector event loop."""
310class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
311 """Windows version of proactor event loop using IOCP."""
313 def __init__(self, proactor=None):
314 if proactor is None:
315 proactor = IocpProactor()
316 super().__init__(proactor)
318 def _run_forever_setup(self):
319 assert self._self_reading_future is None
320 self.call_soon(self._loop_self_reading)
321 super()._run_forever_setup()
323 def _run_forever_cleanup(self):
324 super()._run_forever_cleanup()
325 if self._self_reading_future is not None:
326 ov = self._self_reading_future._ov
327 self._self_reading_future.cancel()
328 # self_reading_future always uses IOCP, so even though it's
329 # been cancelled, we need to make sure that the IOCP message
330 # is received so that the kernel is not holding on to the
331 # memory, possibly causing memory corruption later. Only
332 # unregister it if IO is complete in all respects. Otherwise
333 # we need another _poll() later to complete the IO.
334 if ov is not None and not ov.pending:
335 self._proactor._unregister(ov)
336 self._self_reading_future = None
338 async def create_pipe_connection(self, protocol_factory, address):
339 f = self._proactor.connect_pipe(address)
340 pipe = await f
341 protocol = protocol_factory()
342 trans = self._make_duplex_pipe_transport(pipe, protocol,
343 extra={'addr': address})
344 return trans, protocol
346 async def start_serving_pipe(self, protocol_factory, address):
347 server = PipeServer(address)
349 def loop_accept_pipe(f=None):
350 pipe = None
351 try:
352 if f:
353 pipe = f.result()
354 server._free_instances.discard(pipe)
356 if server.closed():
357 # A client connected before the server was closed:
358 # drop the client (close the pipe) and exit
359 pipe.close()
360 return
362 protocol = protocol_factory()
363 self._make_duplex_pipe_transport(
364 pipe, protocol, extra={'addr': address})
366 pipe = server._get_unconnected_pipe()
367 if pipe is None:
368 return
370 f = self._proactor.accept_pipe(pipe)
371 except BrokenPipeError:
372 if pipe and pipe.fileno() != -1:
373 pipe.close()
374 self.call_soon(loop_accept_pipe)
375 except OSError as exc:
376 if pipe and pipe.fileno() != -1:
377 self.call_exception_handler({
378 'message': 'Pipe accept failed',
379 'exception': exc,
380 'pipe': pipe,
381 })
382 pipe.close()
383 elif self._debug:
384 logger.warning("Accept pipe failed on pipe %r",
385 pipe, exc_info=True)
386 self.call_soon(loop_accept_pipe)
387 except exceptions.CancelledError:
388 if pipe:
389 pipe.close()
390 else:
391 server._accept_pipe_future = f
392 f.add_done_callback(loop_accept_pipe)
394 self.call_soon(loop_accept_pipe)
395 return [server]
397 async def _make_subprocess_transport(self, protocol, args, shell,
398 stdin, stdout, stderr, bufsize,
399 extra=None, **kwargs):
400 waiter = self.create_future()
401 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
402 stdin, stdout, stderr, bufsize,
403 waiter=waiter, extra=extra,
404 **kwargs)
405 try:
406 await waiter
407 except (SystemExit, KeyboardInterrupt):
408 raise
409 except BaseException:
410 transp.close()
411 await transp._wait()
412 raise
414 return transp
417class IocpProactor:
418 """Proactor implementation using IOCP."""
420 def __init__(self, concurrency=INFINITE):
421 self._loop = None
422 self._results = []
423 self._iocp = _overlapped.CreateIoCompletionPort(
424 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
425 self._cache = {}
426 self._registered = weakref.WeakSet()
427 self._unregistered = []
428 self._stopped_serving = weakref.WeakSet()
430 def _check_closed(self):
431 if self._iocp is None:
432 raise RuntimeError('IocpProactor is closed')
434 def __repr__(self):
435 info = ['overlapped#=%s' % len(self._cache),
436 'result#=%s' % len(self._results)]
437 if self._iocp is None:
438 info.append('closed')
439 return '<%s %s>' % (self.__class__.__name__, " ".join(info))
441 def set_loop(self, loop):
442 self._loop = loop
444 def select(self, timeout=None):
445 if not self._results:
446 self._poll(timeout)
447 tmp = self._results
448 self._results = []
449 try:
450 return tmp
451 finally:
452 # Needed to break cycles when an exception occurs.
453 tmp = None
455 def _result(self, value):
456 fut = self._loop.create_future()
457 fut.set_result(value)
458 return fut
460 @staticmethod
461 def finish_socket_func(trans, key, ov):
462 try:
463 return ov.getresult()
464 except OSError as exc:
465 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
466 _overlapped.ERROR_OPERATION_ABORTED):
467 raise ConnectionResetError(*exc.args)
468 else:
469 raise
471 @classmethod
472 def _finish_recvfrom(cls, trans, key, ov, *, empty_result):
473 try:
474 return cls.finish_socket_func(trans, key, ov)
475 except OSError as exc:
476 # WSARecvFrom will report ERROR_PORT_UNREACHABLE when the same
477 # socket is used to send to an address that is not listening.
478 if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE:
479 return empty_result, None
480 else:
481 raise
483 def recv(self, conn, nbytes, flags=0):
484 self._register_with_iocp(conn)
485 ov = _overlapped.Overlapped(NULL)
486 try:
487 if isinstance(conn, socket.socket):
488 ov.WSARecv(conn.fileno(), nbytes, flags)
489 else:
490 ov.ReadFile(conn.fileno(), nbytes)
491 except BrokenPipeError:
492 return self._result(b'')
494 return self._register(ov, conn, self.finish_socket_func)
496 def recv_into(self, conn, buf, flags=0):
497 self._register_with_iocp(conn)
498 ov = _overlapped.Overlapped(NULL)
499 try:
500 if isinstance(conn, socket.socket):
501 ov.WSARecvInto(conn.fileno(), buf, flags)
502 else:
503 ov.ReadFileInto(conn.fileno(), buf)
504 except BrokenPipeError:
505 return self._result(0)
507 return self._register(ov, conn, self.finish_socket_func)
509 def recvfrom(self, conn, nbytes, flags=0):
510 self._register_with_iocp(conn)
511 ov = _overlapped.Overlapped(NULL)
512 try:
513 ov.WSARecvFrom(conn.fileno(), nbytes, flags)
514 except BrokenPipeError:
515 return self._result((b'', None))
517 return self._register(ov, conn, partial(self._finish_recvfrom,
518 empty_result=b''))
520 def recvfrom_into(self, conn, buf, flags=0):
521 self._register_with_iocp(conn)
522 ov = _overlapped.Overlapped(NULL)
523 try:
524 ov.WSARecvFromInto(conn.fileno(), buf, flags)
525 except BrokenPipeError:
526 return self._result((0, None))
528 return self._register(ov, conn, partial(self._finish_recvfrom,
529 empty_result=0))
531 def sendto(self, conn, buf, flags=0, addr=None):
532 self._register_with_iocp(conn)
533 ov = _overlapped.Overlapped(NULL)
535 ov.WSASendTo(conn.fileno(), buf, flags, addr)
537 return self._register(ov, conn, self.finish_socket_func)
539 def send(self, conn, buf, flags=0):
540 self._register_with_iocp(conn)
541 ov = _overlapped.Overlapped(NULL)
542 if isinstance(conn, socket.socket):
543 ov.WSASend(conn.fileno(), buf, flags)
544 else:
545 ov.WriteFile(conn.fileno(), buf)
547 return self._register(ov, conn, self.finish_socket_func)
549 def accept(self, listener):
550 self._register_with_iocp(listener)
551 conn = self._get_accept_socket(listener.family)
552 ov = _overlapped.Overlapped(NULL)
553 ov.AcceptEx(listener.fileno(), conn.fileno())
555 def finish_accept(trans, key, ov):
556 ov.getresult()
557 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
558 buf = struct.pack('@P', listener.fileno())
559 conn.setsockopt(socket.SOL_SOCKET,
560 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
561 conn.settimeout(listener.gettimeout())
562 return conn, conn.getpeername()
564 async def accept_coro(future, conn):
565 # Coroutine closing the accept socket if the future is cancelled
566 try:
567 await future
568 except exceptions.CancelledError:
569 conn.close()
570 raise
572 future = self._register(ov, listener, finish_accept)
573 coro = accept_coro(future, conn)
574 tasks.ensure_future(coro, loop=self._loop)
575 return future
577 def connect(self, conn, address):
578 if conn.type == socket.SOCK_DGRAM:
579 # WSAConnect will complete immediately for UDP sockets so we don't
580 # need to register any IOCP operation
581 _overlapped.WSAConnect(conn.fileno(), address)
582 fut = self._loop.create_future()
583 fut.set_result(None)
584 return fut
586 self._register_with_iocp(conn)
587 # The socket needs to be locally bound before we call ConnectEx().
588 try:
589 _overlapped.BindLocal(conn.fileno(), conn.family)
590 except OSError as e:
591 if e.winerror != errno.WSAEINVAL:
592 raise
593 # Probably already locally bound; check using getsockname().
594 if conn.getsockname()[1] == 0:
595 raise
596 ov = _overlapped.Overlapped(NULL)
597 ov.ConnectEx(conn.fileno(), address)
599 def finish_connect(trans, key, ov):
600 ov.getresult()
601 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
602 conn.setsockopt(socket.SOL_SOCKET,
603 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
604 return conn
606 return self._register(ov, conn, finish_connect)
608 def sendfile(self, sock, file, offset, count):
609 self._register_with_iocp(sock)
610 ov = _overlapped.Overlapped(NULL)
611 offset_low = offset & 0xffff_ffff
612 offset_high = (offset >> 32) & 0xffff_ffff
613 # TransmitFile ignores OVERLAPPED.Offset for handles not opened with
614 # FILE_FLAG_OVERLAPPED, so seek the CRT file pointer to match.
615 file.seek(offset)
616 ov.TransmitFile(sock.fileno(),
617 msvcrt.get_osfhandle(file.fileno()),
618 offset_low, offset_high,
619 count, 0, 0)
621 return self._register(ov, sock, self.finish_socket_func)
623 def accept_pipe(self, pipe):
624 self._register_with_iocp(pipe)
625 ov = _overlapped.Overlapped(NULL)
626 connected = ov.ConnectNamedPipe(pipe.fileno())
628 if connected:
629 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
630 # that the pipe is connected. There is no need to wait for the
631 # completion of the connection.
632 return self._result(pipe)
634 def finish_accept_pipe(trans, key, ov):
635 ov.getresult()
636 return pipe
638 return self._register(ov, pipe, finish_accept_pipe)
640 async def connect_pipe(self, address):
641 delay = CONNECT_PIPE_INIT_DELAY
642 while True:
643 # Unfortunately there is no way to do an overlapped connect to
644 # a pipe. Call CreateFile() in a loop until it doesn't fail with
645 # ERROR_PIPE_BUSY.
646 try:
647 handle = _overlapped.ConnectPipe(address)
648 break
649 except OSError as exc:
650 if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
651 raise
653 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
654 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
655 await tasks.sleep(delay)
657 return windows_utils.PipeHandle(handle)
659 def wait_for_handle(self, handle, timeout=None):
660 """Wait for a handle.
662 Return a Future object. The result of the future is True if the wait
663 completed, or False if the wait did not complete (on timeout).
664 """
665 return self._wait_for_handle(handle, timeout, False)
667 def _wait_cancel(self, event, done_callback):
668 fut = self._wait_for_handle(event, None, True)
669 # add_done_callback() cannot be used because the wait may only complete
670 # in IocpProactor.close(), while the event loop is not running.
671 fut._done_callback = done_callback
672 return fut
674 def _wait_for_handle(self, handle, timeout, _is_cancel):
675 self._check_closed()
677 if timeout is None:
678 ms = _winapi.INFINITE
679 else:
680 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
681 # round away from zero to wait *at least* timeout seconds.
682 ms = math.ceil(timeout * 1e3)
684 # We only create ov so we can use ov.address as a key for the cache.
685 ov = _overlapped.Overlapped(NULL)
686 wait_handle = _overlapped.RegisterWaitWithQueue(
687 handle, self._iocp, ov.address, ms)
688 if _is_cancel:
689 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
690 else:
691 f = _WaitHandleFuture(ov, handle, wait_handle, self,
692 loop=self._loop)
693 if f._source_traceback:
694 del f._source_traceback[-1]
696 def finish_wait_for_handle(trans, key, ov):
697 # Note that this second wait means that we should only use
698 # this with handles types where a successful wait has no
699 # effect. So events or processes are all right, but locks
700 # or semaphores are not. Also note if the handle is
701 # signalled and then quickly reset, then we may return
702 # False even though we have not timed out.
703 return f._poll()
705 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
706 return f
708 def _register_with_iocp(self, obj):
709 # To get notifications of finished ops on this objects sent to the
710 # completion port, were must register the handle.
711 if obj not in self._registered:
712 self._registered.add(obj)
713 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
714 # XXX We could also use SetFileCompletionNotificationModes()
715 # to avoid sending notifications to completion port of ops
716 # that succeed immediately.
718 def _register(self, ov, obj, callback):
719 self._check_closed()
721 # Return a future which will be set with the result of the
722 # operation when it completes. The future's value is actually
723 # the value returned by callback().
724 f = _OverlappedFuture(ov, loop=self._loop)
725 if f._source_traceback:
726 del f._source_traceback[-1]
727 if not ov.pending:
728 # The operation has completed, so no need to postpone the
729 # work. We cannot take this short cut if we need the
730 # NumberOfBytes, CompletionKey values returned by
731 # PostQueuedCompletionStatus().
732 try:
733 value = callback(None, None, ov)
734 except OSError as e:
735 f.set_exception(e)
736 else:
737 f.set_result(value)
738 # Even if GetOverlappedResult() was called, we have to wait for the
739 # notification of the completion in GetQueuedCompletionStatus().
740 # Register the overlapped operation to keep a reference to the
741 # OVERLAPPED object, otherwise the memory is freed and Windows may
742 # read uninitialized memory.
744 # Register the overlapped operation for later. Note that
745 # we only store obj to prevent it from being garbage
746 # collected too early.
747 self._cache[ov.address] = (f, ov, obj, callback)
748 return f
750 def _unregister(self, ov):
751 """Unregister an overlapped object.
753 Call this method when its future has been cancelled. The event can
754 already be signalled (pending in the proactor event queue). It is also
755 safe if the event is never signalled (because it was cancelled).
756 """
757 self._check_closed()
758 self._unregistered.append(ov)
760 def _get_accept_socket(self, family):
761 s = socket.socket(family)
762 s.settimeout(0)
763 return s
765 def _poll(self, timeout=None):
766 if timeout is None:
767 ms = INFINITE
768 elif timeout < 0:
769 raise ValueError("negative timeout")
770 else:
771 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
772 # round away from zero to wait *at least* timeout seconds.
773 ms = math.ceil(timeout * 1e3)
774 if ms >= INFINITE:
775 raise ValueError("timeout too big")
777 while True:
778 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
779 if status is None:
780 break
781 ms = 0
783 err, transferred, key, address = status
784 try:
785 f, ov, obj, callback = self._cache.pop(address)
786 except KeyError:
787 if self._loop.get_debug():
788 self._loop.call_exception_handler({
789 'message': ('GetQueuedCompletionStatus() returned an '
790 'unexpected event'),
791 'status': ('err=%s transferred=%s key=%#x address=%#x'
792 % (err, transferred, key, address)),
793 })
795 # key is either zero, or it is used to return a pipe
796 # handle which should be closed to avoid a leak.
797 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
798 _winapi.CloseHandle(key)
799 continue
801 if obj in self._stopped_serving:
802 f.cancel()
803 # Don't call the callback if _register() already read the result or
804 # if the overlapped has been cancelled
805 elif not f.done():
806 try:
807 value = callback(transferred, key, ov)
808 except OSError as e:
809 f.set_exception(e)
810 self._results.append(f)
811 else:
812 f.set_result(value)
813 self._results.append(f)
814 finally:
815 f = None
817 # Remove unregistered futures
818 for ov in self._unregistered:
819 self._cache.pop(ov.address, None)
820 self._unregistered.clear()
822 def _stop_serving(self, obj):
823 # obj is a socket or pipe handle. It will be closed in
824 # BaseProactorEventLoop._stop_serving() which will make any
825 # pending operations fail quickly.
826 self._stopped_serving.add(obj)
828 def close(self):
829 if self._iocp is None:
830 # already closed
831 return
833 # Cancel remaining registered operations.
834 for fut, ov, obj, callback in list(self._cache.values()):
835 if fut.cancelled():
836 # Nothing to do with cancelled futures
837 pass
838 elif isinstance(fut, _WaitCancelFuture):
839 # _WaitCancelFuture must not be cancelled
840 pass
841 else:
842 try:
843 fut.cancel()
844 except OSError as exc:
845 if self._loop is not None:
846 context = {
847 'message': 'Cancelling a future failed',
848 'exception': exc,
849 'future': fut,
850 }
851 if fut._source_traceback:
852 context['source_traceback'] = fut._source_traceback
853 self._loop.call_exception_handler(context)
855 # Wait until all cancelled overlapped complete: don't exit with running
856 # overlapped to prevent a crash. Display progress every second if the
857 # loop is still running.
858 msg_update = 1.0
859 start_time = time.monotonic()
860 next_msg = start_time + msg_update
861 while self._cache:
862 if next_msg <= time.monotonic():
863 logger.debug('%r is running after closing for %.1f seconds',
864 self, time.monotonic() - start_time)
865 next_msg = time.monotonic() + msg_update
867 # handle a few events, or timeout
868 self._poll(msg_update)
870 self._results = []
872 _winapi.CloseHandle(self._iocp)
873 self._iocp = None
875 def __del__(self):
876 self.close()
879class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
881 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
882 self._proc = windows_utils.Popen(
883 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
884 bufsize=bufsize, **kwargs)
886 def callback(f):
887 returncode = self._proc.poll()
888 self._process_exited(returncode)
890 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
891 f.add_done_callback(callback)
894SelectorEventLoop = _WindowsSelectorEventLoop
897class _WindowsSelectorEventLoopPolicy(events._BaseDefaultEventLoopPolicy):
898 _loop_factory = SelectorEventLoop
901class _WindowsProactorEventLoopPolicy(events._BaseDefaultEventLoopPolicy):
902 _loop_factory = ProactorEventLoop
905_DefaultEventLoopPolicy = _WindowsProactorEventLoopPolicy
906EventLoop = ProactorEventLoop