Coverage for Lib/asyncio/windows_utils.py: 0%

107 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-15 02:02 +0000

1"""Various Windows specific bits and pieces.""" 

2 

3import sys 

4 

5if sys.platform != 'win32': # pragma: no cover 

6 raise ImportError('win32 only') 

7 

8import _winapi 

9import itertools 

10import msvcrt 

11import os 

12import subprocess 

13import tempfile 

14import warnings 

15 

16 

17__all__ = 'pipe', 'Popen', 'PIPE', 'PipeHandle' 

18 

19 

20# Constants/globals 

21 

22 

23BUFSIZE = 8192 

24PIPE = subprocess.PIPE 

25STDOUT = subprocess.STDOUT 

26_mmap_counter = itertools.count() 

27 

28 

29# Replacement for os.pipe() using handles instead of fds 

30 

31 

32def pipe(*, duplex=False, overlapped=(True, True), bufsize=BUFSIZE): 

33 """Like os.pipe() but with overlapped support and using handles not fds.""" 

34 address = tempfile.mktemp( 

35 prefix=r'\\.\pipe\python-pipe-{:d}-{:d}-'.format( 

36 os.getpid(), next(_mmap_counter))) 

37 

38 if duplex: 

39 openmode = _winapi.PIPE_ACCESS_DUPLEX 

40 access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE 

41 obsize, ibsize = bufsize, bufsize 

42 else: 

43 openmode = _winapi.PIPE_ACCESS_INBOUND 

44 access = _winapi.GENERIC_WRITE 

45 obsize, ibsize = 0, bufsize 

46 

47 openmode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 

48 

49 if overlapped[0]: 

50 openmode |= _winapi.FILE_FLAG_OVERLAPPED 

51 

52 if overlapped[1]: 

53 flags_and_attribs = _winapi.FILE_FLAG_OVERLAPPED 

54 else: 

55 flags_and_attribs = 0 

56 

57 h1 = h2 = None 

58 try: 

59 h1 = _winapi.CreateNamedPipe( 

60 address, openmode, _winapi.PIPE_WAIT, 

61 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL) 

62 

63 h2 = _winapi.CreateFile( 

64 address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, 

65 flags_and_attribs, _winapi.NULL) 

66 

67 ov = _winapi.ConnectNamedPipe(h1, overlapped=True) 

68 ov.GetOverlappedResult(True) 

69 return h1, h2 

70 except: 

71 if h1 is not None: 

72 _winapi.CloseHandle(h1) 

73 if h2 is not None: 

74 _winapi.CloseHandle(h2) 

75 raise 

76 

77 

78# Wrapper for a pipe handle 

79 

80 

81class PipeHandle: 

82 """Wrapper for an overlapped pipe handle which is vaguely file-object like. 

83 

84 The IOCP event loop can use these instead of socket objects. 

85 """ 

86 def __init__(self, handle): 

87 self._handle = handle 

88 

89 def __repr__(self): 

90 if self._handle is not None: 

91 handle = f'handle={self._handle!r}' 

92 else: 

93 handle = 'closed' 

94 return f'<{self.__class__.__name__} {handle}>' 

95 

96 @property 

97 def handle(self): 

98 return self._handle 

99 

100 def fileno(self): 

101 if self._handle is None: 

102 raise ValueError("I/O operation on closed pipe") 

103 return self._handle 

104 

105 def close(self, *, CloseHandle=_winapi.CloseHandle): 

106 if self._handle is not None: 

107 CloseHandle(self._handle) 

108 self._handle = None 

109 

110 def __del__(self, _warn=warnings.warn): 

111 if self._handle is not None: 

112 _warn(f"unclosed {self!r}", ResourceWarning, source=self) 

113 self.close() 

114 

115 def __enter__(self): 

116 return self 

117 

118 def __exit__(self, t, v, tb): 

119 self.close() 

120 

121 

122# Replacement for subprocess.Popen using overlapped pipe handles 

123 

124 

125class Popen(subprocess.Popen): 

126 """Replacement for subprocess.Popen using overlapped pipe handles. 

127 

128 The stdin, stdout, stderr are None or instances of PipeHandle. 

129 """ 

130 def __init__(self, args, stdin=None, stdout=None, stderr=None, **kwds): 

131 assert not kwds.get('universal_newlines') 

132 assert kwds.get('bufsize', 0) == 0 

133 stdin_rfd = stdout_wfd = stderr_wfd = None 

134 stdin_wh = stdout_rh = stderr_rh = None 

135 if stdin == PIPE: 

136 stdin_rh, stdin_wh = pipe(overlapped=(False, True), duplex=True) 

137 stdin_rfd = msvcrt.open_osfhandle(stdin_rh, os.O_RDONLY) 

138 else: 

139 stdin_rfd = stdin 

140 if stdout == PIPE: 

141 stdout_rh, stdout_wh = pipe(overlapped=(True, False)) 

142 stdout_wfd = msvcrt.open_osfhandle(stdout_wh, 0) 

143 else: 

144 stdout_wfd = stdout 

145 if stderr == PIPE: 

146 stderr_rh, stderr_wh = pipe(overlapped=(True, False)) 

147 stderr_wfd = msvcrt.open_osfhandle(stderr_wh, 0) 

148 elif stderr == STDOUT: 

149 stderr_wfd = stdout_wfd 

150 else: 

151 stderr_wfd = stderr 

152 try: 

153 super().__init__(args, stdin=stdin_rfd, stdout=stdout_wfd, 

154 stderr=stderr_wfd, **kwds) 

155 except: 

156 for h in (stdin_wh, stdout_rh, stderr_rh): 

157 if h is not None: 

158 _winapi.CloseHandle(h) 

159 raise 

160 else: 

161 if stdin_wh is not None: 

162 self.stdin = PipeHandle(stdin_wh) 

163 if stdout_rh is not None: 

164 self.stdout = PipeHandle(stdout_rh) 

165 if stderr_rh is not None: 

166 self.stderr = PipeHandle(stderr_rh) 

167 finally: 

168 if stdin == PIPE: 

169 os.close(stdin_rfd) 

170 if stdout == PIPE: 

171 os.close(stdout_wfd) 

172 if stderr == PIPE: 

173 os.close(stderr_wfd)