Coverage for Lib/asyncio/windows_events.py: 0%
549 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
1"""Selector and proactor event loops for Windows."""
3import sys
5if sys.platform != 'win32': # pragma: no cover
6 raise ImportError('win32 only')
8import _overlapped
9import _winapi
10import errno
11from functools import partial
12import math
13import msvcrt
14import socket
15import struct
16import time
17import weakref
19from . import events
20from . import base_subprocess
21from . import futures
22from . import exceptions
23from . import proactor_events
24from . import selector_events
25from . import tasks
26from . import windows_utils
27from .log import logger
30__all__ = (
31 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
32 '_DefaultEventLoopPolicy', '_WindowsSelectorEventLoopPolicy',
33 '_WindowsProactorEventLoopPolicy', 'EventLoop',
34)
37NULL = _winapi.NULL
38INFINITE = _winapi.INFINITE
39ERROR_CONNECTION_REFUSED = 1225
40ERROR_CONNECTION_ABORTED = 1236
42# Initial delay in seconds for connect_pipe() before retrying to connect
43CONNECT_PIPE_INIT_DELAY = 0.001
45# Maximum delay in seconds for connect_pipe() before retrying to connect
46CONNECT_PIPE_MAX_DELAY = 0.100
49class _OverlappedFuture(futures.Future):
50 """Subclass of Future which represents an overlapped operation.
52 Cancelling it will immediately cancel the overlapped operation.
53 """
55 def __init__(self, ov, *, loop=None):
56 super().__init__(loop=loop)
57 if self._source_traceback:
58 del self._source_traceback[-1]
59 self._ov = ov
61 def _repr_info(self):
62 info = super()._repr_info()
63 if self._ov is not None:
64 state = 'pending' if self._ov.pending else 'completed'
65 info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
66 return info
68 def _cancel_overlapped(self):
69 if self._ov is None:
70 return
71 try:
72 self._ov.cancel()
73 except OSError as exc:
74 context = {
75 'message': 'Cancelling an overlapped future failed',
76 'exception': exc,
77 'future': self,
78 }
79 if self._source_traceback:
80 context['source_traceback'] = self._source_traceback
81 self._loop.call_exception_handler(context)
82 self._ov = None
84 def cancel(self, msg=None):
85 self._cancel_overlapped()
86 return super().cancel(msg=msg)
88 def set_exception(self, exception):
89 super().set_exception(exception)
90 self._cancel_overlapped()
92 def set_result(self, result):
93 super().set_result(result)
94 self._ov = None
97class _BaseWaitHandleFuture(futures.Future):
98 """Subclass of Future which represents a wait handle."""
100 def __init__(self, ov, handle, wait_handle, *, loop=None):
101 super().__init__(loop=loop)
102 if self._source_traceback:
103 del self._source_traceback[-1]
104 # Keep a reference to the Overlapped object to keep it alive until the
105 # wait is unregistered
106 self._ov = ov
107 self._handle = handle
108 self._wait_handle = wait_handle
110 # Should we call UnregisterWaitEx() if the wait completes
111 # or is cancelled?
112 self._registered = True
114 def _poll(self):
115 # non-blocking wait: use a timeout of 0 millisecond
116 return (_winapi.WaitForSingleObject(self._handle, 0) ==
117 _winapi.WAIT_OBJECT_0)
119 def _repr_info(self):
120 info = super()._repr_info()
121 info.append(f'handle={self._handle:#x}')
122 if self._handle is not None:
123 state = 'signaled' if self._poll() else 'waiting'
124 info.append(state)
125 if self._wait_handle is not None:
126 info.append(f'wait_handle={self._wait_handle:#x}')
127 return info
129 def _unregister_wait_cb(self, fut):
130 # The wait was unregistered: it's not safe to destroy the Overlapped
131 # object
132 self._ov = None
134 def _unregister_wait(self):
135 if not self._registered:
136 return
137 self._registered = False
139 wait_handle = self._wait_handle
140 self._wait_handle = None
141 try:
142 _overlapped.UnregisterWait(wait_handle)
143 except OSError as exc:
144 if exc.winerror != _overlapped.ERROR_IO_PENDING:
145 context = {
146 'message': 'Failed to unregister the wait handle',
147 'exception': exc,
148 'future': self,
149 }
150 if self._source_traceback:
151 context['source_traceback'] = self._source_traceback
152 self._loop.call_exception_handler(context)
153 return
154 # ERROR_IO_PENDING means that the unregister is pending
156 self._unregister_wait_cb(None)
158 def cancel(self, msg=None):
159 self._unregister_wait()
160 return super().cancel(msg=msg)
162 def set_exception(self, exception):
163 self._unregister_wait()
164 super().set_exception(exception)
166 def set_result(self, result):
167 self._unregister_wait()
168 super().set_result(result)
171class _WaitCancelFuture(_BaseWaitHandleFuture):
172 """Subclass of Future which represents a wait for the cancellation of a
173 _WaitHandleFuture using an event.
174 """
176 def __init__(self, ov, event, wait_handle, *, loop=None):
177 super().__init__(ov, event, wait_handle, loop=loop)
179 self._done_callback = None
181 def cancel(self):
182 raise RuntimeError("_WaitCancelFuture must not be cancelled")
184 def set_result(self, result):
185 super().set_result(result)
186 if self._done_callback is not None:
187 self._done_callback(self)
189 def set_exception(self, exception):
190 super().set_exception(exception)
191 if self._done_callback is not None:
192 self._done_callback(self)
195class _WaitHandleFuture(_BaseWaitHandleFuture):
196 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
197 super().__init__(ov, handle, wait_handle, loop=loop)
198 self._proactor = proactor
199 self._unregister_proactor = True
200 self._event = _overlapped.CreateEvent(None, True, False, None)
201 self._event_fut = None
203 def _unregister_wait_cb(self, fut):
204 if self._event is not None:
205 _winapi.CloseHandle(self._event)
206 self._event = None
207 self._event_fut = None
209 # If the wait was cancelled, the wait may never be signalled, so
210 # it's required to unregister it. Otherwise, IocpProactor.close() will
211 # wait forever for an event which will never come.
212 #
213 # If the IocpProactor already received the event, it's safe to call
214 # _unregister() because we kept a reference to the Overlapped object
215 # which is used as a unique key.
216 self._proactor._unregister(self._ov)
217 self._proactor = None
219 super()._unregister_wait_cb(fut)
221 def _unregister_wait(self):
222 if not self._registered:
223 return
224 self._registered = False
226 wait_handle = self._wait_handle
227 self._wait_handle = None
228 try:
229 _overlapped.UnregisterWaitEx(wait_handle, self._event)
230 except OSError as exc:
231 if exc.winerror != _overlapped.ERROR_IO_PENDING:
232 context = {
233 'message': 'Failed to unregister the wait handle',
234 'exception': exc,
235 'future': self,
236 }
237 if self._source_traceback:
238 context['source_traceback'] = self._source_traceback
239 self._loop.call_exception_handler(context)
240 return
241 # ERROR_IO_PENDING is not an error, the wait was unregistered
243 self._event_fut = self._proactor._wait_cancel(self._event,
244 self._unregister_wait_cb)
247class PipeServer(object):
248 """Class representing a pipe server.
250 This is much like a bound, listening socket.
251 """
252 def __init__(self, address):
253 self._address = address
254 self._free_instances = weakref.WeakSet()
255 # initialize the pipe attribute before calling _server_pipe_handle()
256 # because this function can raise an exception and the destructor calls
257 # the close() method
258 self._pipe = None
259 self._accept_pipe_future = None
260 self._pipe = self._server_pipe_handle(True)
262 def _get_unconnected_pipe(self):
263 # Create new instance and return previous one. This ensures
264 # that (until the server is closed) there is always at least
265 # one pipe handle for address. Therefore if a client attempt
266 # to connect it will not fail with FileNotFoundError.
267 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
268 return tmp
270 def _server_pipe_handle(self, first):
271 # Return a wrapper for a new pipe handle.
272 if self.closed():
273 return None
274 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
275 if first:
276 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
277 h = _winapi.CreateNamedPipe(
278 self._address, flags,
279 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
280 _winapi.PIPE_WAIT,
281 _winapi.PIPE_UNLIMITED_INSTANCES,
282 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
283 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
284 pipe = windows_utils.PipeHandle(h)
285 self._free_instances.add(pipe)
286 return pipe
288 def closed(self):
289 return (self._address is None)
291 def close(self):
292 if self._accept_pipe_future is not None:
293 self._accept_pipe_future.cancel()
294 self._accept_pipe_future = None
295 # Close all instances which have not been connected to by a client.
296 if self._address is not None:
297 for pipe in self._free_instances:
298 pipe.close()
299 self._pipe = None
300 self._address = None
301 self._free_instances.clear()
303 __del__ = close
306class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
307 """Windows version of selector event loop."""
310class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
311 """Windows version of proactor event loop using IOCP."""
313 def __init__(self, proactor=None):
314 if proactor is None:
315 proactor = IocpProactor()
316 super().__init__(proactor)
318 def _run_forever_setup(self):
319 assert self._self_reading_future is None
320 self.call_soon(self._loop_self_reading)
321 super()._run_forever_setup()
323 def _run_forever_cleanup(self):
324 super()._run_forever_cleanup()
325 if self._self_reading_future is not None:
326 ov = self._self_reading_future._ov
327 self._self_reading_future.cancel()
328 # self_reading_future always uses IOCP, so even though it's
329 # been cancelled, we need to make sure that the IOCP message
330 # is received so that the kernel is not holding on to the
331 # memory, possibly causing memory corruption later. Only
332 # unregister it if IO is complete in all respects. Otherwise
333 # we need another _poll() later to complete the IO.
334 if ov is not None and not ov.pending:
335 self._proactor._unregister(ov)
336 self._self_reading_future = None
338 async def create_pipe_connection(self, protocol_factory, address):
339 f = self._proactor.connect_pipe(address)
340 pipe = await f
341 protocol = protocol_factory()
342 trans = self._make_duplex_pipe_transport(pipe, protocol,
343 extra={'addr': address})
344 return trans, protocol
346 async def start_serving_pipe(self, protocol_factory, address):
347 server = PipeServer(address)
349 def loop_accept_pipe(f=None):
350 pipe = None
351 try:
352 if f:
353 pipe = f.result()
354 server._free_instances.discard(pipe)
356 if server.closed():
357 # A client connected before the server was closed:
358 # drop the client (close the pipe) and exit
359 pipe.close()
360 return
362 protocol = protocol_factory()
363 self._make_duplex_pipe_transport(
364 pipe, protocol, extra={'addr': address})
366 pipe = server._get_unconnected_pipe()
367 if pipe is None:
368 return
370 f = self._proactor.accept_pipe(pipe)
371 except BrokenPipeError:
372 if pipe and pipe.fileno() != -1:
373 pipe.close()
374 self.call_soon(loop_accept_pipe)
375 except OSError as exc:
376 if pipe and pipe.fileno() != -1:
377 self.call_exception_handler({
378 'message': 'Pipe accept failed',
379 'exception': exc,
380 'pipe': pipe,
381 })
382 pipe.close()
383 elif self._debug:
384 logger.warning("Accept pipe failed on pipe %r",
385 pipe, exc_info=True)
386 self.call_soon(loop_accept_pipe)
387 except exceptions.CancelledError:
388 if pipe:
389 pipe.close()
390 else:
391 server._accept_pipe_future = f
392 f.add_done_callback(loop_accept_pipe)
394 self.call_soon(loop_accept_pipe)
395 return [server]
397 async def _make_subprocess_transport(self, protocol, args, shell,
398 stdin, stdout, stderr, bufsize,
399 extra=None, **kwargs):
400 waiter = self.create_future()
401 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
402 stdin, stdout, stderr, bufsize,
403 waiter=waiter, extra=extra,
404 **kwargs)
405 try:
406 await waiter
407 except (SystemExit, KeyboardInterrupt):
408 raise
409 except BaseException:
410 transp.close()
411 await transp._wait()
412 raise
414 return transp
417class IocpProactor:
418 """Proactor implementation using IOCP."""
420 def __init__(self, concurrency=INFINITE):
421 self._loop = None
422 self._results = []
423 self._iocp = _overlapped.CreateIoCompletionPort(
424 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
425 self._cache = {}
426 self._registered = weakref.WeakSet()
427 self._unregistered = []
428 self._stopped_serving = weakref.WeakSet()
430 def _check_closed(self):
431 if self._iocp is None:
432 raise RuntimeError('IocpProactor is closed')
434 def __repr__(self):
435 info = ['overlapped#=%s' % len(self._cache),
436 'result#=%s' % len(self._results)]
437 if self._iocp is None:
438 info.append('closed')
439 return '<%s %s>' % (self.__class__.__name__, " ".join(info))
441 def set_loop(self, loop):
442 self._loop = loop
444 def select(self, timeout=None):
445 if not self._results:
446 self._poll(timeout)
447 tmp = self._results
448 self._results = []
449 try:
450 return tmp
451 finally:
452 # Needed to break cycles when an exception occurs.
453 tmp = None
455 def _result(self, value):
456 fut = self._loop.create_future()
457 fut.set_result(value)
458 return fut
460 @staticmethod
461 def finish_socket_func(trans, key, ov):
462 try:
463 return ov.getresult()
464 except OSError as exc:
465 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
466 _overlapped.ERROR_OPERATION_ABORTED):
467 raise ConnectionResetError(*exc.args)
468 else:
469 raise
471 @classmethod
472 def _finish_recvfrom(cls, trans, key, ov, *, empty_result):
473 try:
474 return cls.finish_socket_func(trans, key, ov)
475 except OSError as exc:
476 # WSARecvFrom will report ERROR_PORT_UNREACHABLE when the same
477 # socket is used to send to an address that is not listening.
478 if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE:
479 return empty_result, None
480 else:
481 raise
483 def recv(self, conn, nbytes, flags=0):
484 self._register_with_iocp(conn)
485 ov = _overlapped.Overlapped(NULL)
486 try:
487 if isinstance(conn, socket.socket):
488 ov.WSARecv(conn.fileno(), nbytes, flags)
489 else:
490 ov.ReadFile(conn.fileno(), nbytes)
491 except BrokenPipeError:
492 return self._result(b'')
494 return self._register(ov, conn, self.finish_socket_func)
496 def recv_into(self, conn, buf, flags=0):
497 self._register_with_iocp(conn)
498 ov = _overlapped.Overlapped(NULL)
499 try:
500 if isinstance(conn, socket.socket):
501 ov.WSARecvInto(conn.fileno(), buf, flags)
502 else:
503 ov.ReadFileInto(conn.fileno(), buf)
504 except BrokenPipeError:
505 return self._result(0)
507 return self._register(ov, conn, self.finish_socket_func)
509 def recvfrom(self, conn, nbytes, flags=0):
510 self._register_with_iocp(conn)
511 ov = _overlapped.Overlapped(NULL)
512 try:
513 ov.WSARecvFrom(conn.fileno(), nbytes, flags)
514 except BrokenPipeError:
515 return self._result((b'', None))
517 return self._register(ov, conn, partial(self._finish_recvfrom,
518 empty_result=b''))
520 def recvfrom_into(self, conn, buf, flags=0):
521 self._register_with_iocp(conn)
522 ov = _overlapped.Overlapped(NULL)
523 try:
524 ov.WSARecvFromInto(conn.fileno(), buf, flags)
525 except BrokenPipeError:
526 return self._result((0, None))
528 return self._register(ov, conn, partial(self._finish_recvfrom,
529 empty_result=0))
531 def sendto(self, conn, buf, flags=0, addr=None):
532 self._register_with_iocp(conn)
533 ov = _overlapped.Overlapped(NULL)
535 ov.WSASendTo(conn.fileno(), buf, flags, addr)
537 return self._register(ov, conn, self.finish_socket_func)
539 def send(self, conn, buf, flags=0):
540 self._register_with_iocp(conn)
541 ov = _overlapped.Overlapped(NULL)
542 if isinstance(conn, socket.socket):
543 ov.WSASend(conn.fileno(), buf, flags)
544 else:
545 ov.WriteFile(conn.fileno(), buf)
547 return self._register(ov, conn, self.finish_socket_func)
549 def accept(self, listener):
550 self._register_with_iocp(listener)
551 conn = self._get_accept_socket(listener.family)
552 ov = _overlapped.Overlapped(NULL)
553 ov.AcceptEx(listener.fileno(), conn.fileno())
555 def finish_accept(trans, key, ov):
556 ov.getresult()
557 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
558 buf = struct.pack('@P', listener.fileno())
559 conn.setsockopt(socket.SOL_SOCKET,
560 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
561 conn.settimeout(listener.gettimeout())
562 return conn, conn.getpeername()
564 async def accept_coro(future, conn):
565 # Coroutine closing the accept socket if the future is cancelled
566 try:
567 await future
568 except exceptions.CancelledError:
569 conn.close()
570 raise
572 future = self._register(ov, listener, finish_accept)
573 coro = accept_coro(future, conn)
574 tasks.ensure_future(coro, loop=self._loop)
575 return future
577 def connect(self, conn, address):
578 if conn.type == socket.SOCK_DGRAM:
579 # WSAConnect will complete immediately for UDP sockets so we don't
580 # need to register any IOCP operation
581 _overlapped.WSAConnect(conn.fileno(), address)
582 fut = self._loop.create_future()
583 fut.set_result(None)
584 return fut
586 self._register_with_iocp(conn)
587 # The socket needs to be locally bound before we call ConnectEx().
588 try:
589 _overlapped.BindLocal(conn.fileno(), conn.family)
590 except OSError as e:
591 if e.winerror != errno.WSAEINVAL:
592 raise
593 # Probably already locally bound; check using getsockname().
594 if conn.getsockname()[1] == 0:
595 raise
596 ov = _overlapped.Overlapped(NULL)
597 ov.ConnectEx(conn.fileno(), address)
599 def finish_connect(trans, key, ov):
600 ov.getresult()
601 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
602 conn.setsockopt(socket.SOL_SOCKET,
603 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
604 return conn
606 return self._register(ov, conn, finish_connect)
608 def sendfile(self, sock, file, offset, count):
609 self._register_with_iocp(sock)
610 ov = _overlapped.Overlapped(NULL)
611 offset_low = offset & 0xffff_ffff
612 offset_high = (offset >> 32) & 0xffff_ffff
613 ov.TransmitFile(sock.fileno(),
614 msvcrt.get_osfhandle(file.fileno()),
615 offset_low, offset_high,
616 count, 0, 0)
618 return self._register(ov, sock, self.finish_socket_func)
620 def accept_pipe(self, pipe):
621 self._register_with_iocp(pipe)
622 ov = _overlapped.Overlapped(NULL)
623 connected = ov.ConnectNamedPipe(pipe.fileno())
625 if connected:
626 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
627 # that the pipe is connected. There is no need to wait for the
628 # completion of the connection.
629 return self._result(pipe)
631 def finish_accept_pipe(trans, key, ov):
632 ov.getresult()
633 return pipe
635 return self._register(ov, pipe, finish_accept_pipe)
637 async def connect_pipe(self, address):
638 delay = CONNECT_PIPE_INIT_DELAY
639 while True:
640 # Unfortunately there is no way to do an overlapped connect to
641 # a pipe. Call CreateFile() in a loop until it doesn't fail with
642 # ERROR_PIPE_BUSY.
643 try:
644 handle = _overlapped.ConnectPipe(address)
645 break
646 except OSError as exc:
647 if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
648 raise
650 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
651 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
652 await tasks.sleep(delay)
654 return windows_utils.PipeHandle(handle)
656 def wait_for_handle(self, handle, timeout=None):
657 """Wait for a handle.
659 Return a Future object. The result of the future is True if the wait
660 completed, or False if the wait did not complete (on timeout).
661 """
662 return self._wait_for_handle(handle, timeout, False)
664 def _wait_cancel(self, event, done_callback):
665 fut = self._wait_for_handle(event, None, True)
666 # add_done_callback() cannot be used because the wait may only complete
667 # in IocpProactor.close(), while the event loop is not running.
668 fut._done_callback = done_callback
669 return fut
671 def _wait_for_handle(self, handle, timeout, _is_cancel):
672 self._check_closed()
674 if timeout is None:
675 ms = _winapi.INFINITE
676 else:
677 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
678 # round away from zero to wait *at least* timeout seconds.
679 ms = math.ceil(timeout * 1e3)
681 # We only create ov so we can use ov.address as a key for the cache.
682 ov = _overlapped.Overlapped(NULL)
683 wait_handle = _overlapped.RegisterWaitWithQueue(
684 handle, self._iocp, ov.address, ms)
685 if _is_cancel:
686 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
687 else:
688 f = _WaitHandleFuture(ov, handle, wait_handle, self,
689 loop=self._loop)
690 if f._source_traceback:
691 del f._source_traceback[-1]
693 def finish_wait_for_handle(trans, key, ov):
694 # Note that this second wait means that we should only use
695 # this with handles types where a successful wait has no
696 # effect. So events or processes are all right, but locks
697 # or semaphores are not. Also note if the handle is
698 # signalled and then quickly reset, then we may return
699 # False even though we have not timed out.
700 return f._poll()
702 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
703 return f
705 def _register_with_iocp(self, obj):
706 # To get notifications of finished ops on this objects sent to the
707 # completion port, were must register the handle.
708 if obj not in self._registered:
709 self._registered.add(obj)
710 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
711 # XXX We could also use SetFileCompletionNotificationModes()
712 # to avoid sending notifications to completion port of ops
713 # that succeed immediately.
715 def _register(self, ov, obj, callback):
716 self._check_closed()
718 # Return a future which will be set with the result of the
719 # operation when it completes. The future's value is actually
720 # the value returned by callback().
721 f = _OverlappedFuture(ov, loop=self._loop)
722 if f._source_traceback:
723 del f._source_traceback[-1]
724 if not ov.pending:
725 # The operation has completed, so no need to postpone the
726 # work. We cannot take this short cut if we need the
727 # NumberOfBytes, CompletionKey values returned by
728 # PostQueuedCompletionStatus().
729 try:
730 value = callback(None, None, ov)
731 except OSError as e:
732 f.set_exception(e)
733 else:
734 f.set_result(value)
735 # Even if GetOverlappedResult() was called, we have to wait for the
736 # notification of the completion in GetQueuedCompletionStatus().
737 # Register the overlapped operation to keep a reference to the
738 # OVERLAPPED object, otherwise the memory is freed and Windows may
739 # read uninitialized memory.
741 # Register the overlapped operation for later. Note that
742 # we only store obj to prevent it from being garbage
743 # collected too early.
744 self._cache[ov.address] = (f, ov, obj, callback)
745 return f
747 def _unregister(self, ov):
748 """Unregister an overlapped object.
750 Call this method when its future has been cancelled. The event can
751 already be signalled (pending in the proactor event queue). It is also
752 safe if the event is never signalled (because it was cancelled).
753 """
754 self._check_closed()
755 self._unregistered.append(ov)
757 def _get_accept_socket(self, family):
758 s = socket.socket(family)
759 s.settimeout(0)
760 return s
762 def _poll(self, timeout=None):
763 if timeout is None:
764 ms = INFINITE
765 elif timeout < 0:
766 raise ValueError("negative timeout")
767 else:
768 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
769 # round away from zero to wait *at least* timeout seconds.
770 ms = math.ceil(timeout * 1e3)
771 if ms >= INFINITE:
772 raise ValueError("timeout too big")
774 while True:
775 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
776 if status is None:
777 break
778 ms = 0
780 err, transferred, key, address = status
781 try:
782 f, ov, obj, callback = self._cache.pop(address)
783 except KeyError:
784 if self._loop.get_debug():
785 self._loop.call_exception_handler({
786 'message': ('GetQueuedCompletionStatus() returned an '
787 'unexpected event'),
788 'status': ('err=%s transferred=%s key=%#x address=%#x'
789 % (err, transferred, key, address)),
790 })
792 # key is either zero, or it is used to return a pipe
793 # handle which should be closed to avoid a leak.
794 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
795 _winapi.CloseHandle(key)
796 continue
798 if obj in self._stopped_serving:
799 f.cancel()
800 # Don't call the callback if _register() already read the result or
801 # if the overlapped has been cancelled
802 elif not f.done():
803 try:
804 value = callback(transferred, key, ov)
805 except OSError as e:
806 f.set_exception(e)
807 self._results.append(f)
808 else:
809 f.set_result(value)
810 self._results.append(f)
811 finally:
812 f = None
814 # Remove unregistered futures
815 for ov in self._unregistered:
816 self._cache.pop(ov.address, None)
817 self._unregistered.clear()
819 def _stop_serving(self, obj):
820 # obj is a socket or pipe handle. It will be closed in
821 # BaseProactorEventLoop._stop_serving() which will make any
822 # pending operations fail quickly.
823 self._stopped_serving.add(obj)
825 def close(self):
826 if self._iocp is None:
827 # already closed
828 return
830 # Cancel remaining registered operations.
831 for fut, ov, obj, callback in list(self._cache.values()):
832 if fut.cancelled():
833 # Nothing to do with cancelled futures
834 pass
835 elif isinstance(fut, _WaitCancelFuture):
836 # _WaitCancelFuture must not be cancelled
837 pass
838 else:
839 try:
840 fut.cancel()
841 except OSError as exc:
842 if self._loop is not None:
843 context = {
844 'message': 'Cancelling a future failed',
845 'exception': exc,
846 'future': fut,
847 }
848 if fut._source_traceback:
849 context['source_traceback'] = fut._source_traceback
850 self._loop.call_exception_handler(context)
852 # Wait until all cancelled overlapped complete: don't exit with running
853 # overlapped to prevent a crash. Display progress every second if the
854 # loop is still running.
855 msg_update = 1.0
856 start_time = time.monotonic()
857 next_msg = start_time + msg_update
858 while self._cache:
859 if next_msg <= time.monotonic():
860 logger.debug('%r is running after closing for %.1f seconds',
861 self, time.monotonic() - start_time)
862 next_msg = time.monotonic() + msg_update
864 # handle a few events, or timeout
865 self._poll(msg_update)
867 self._results = []
869 _winapi.CloseHandle(self._iocp)
870 self._iocp = None
872 def __del__(self):
873 self.close()
876class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
878 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
879 self._proc = windows_utils.Popen(
880 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
881 bufsize=bufsize, **kwargs)
883 def callback(f):
884 returncode = self._proc.poll()
885 self._process_exited(returncode)
887 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
888 f.add_done_callback(callback)
891SelectorEventLoop = _WindowsSelectorEventLoop
894class _WindowsSelectorEventLoopPolicy(events._BaseDefaultEventLoopPolicy):
895 _loop_factory = SelectorEventLoop
898class _WindowsProactorEventLoopPolicy(events._BaseDefaultEventLoopPolicy):
899 _loop_factory = ProactorEventLoop
902_DefaultEventLoopPolicy = _WindowsProactorEventLoopPolicy
903EventLoop = ProactorEventLoop