Coverage for Lib / asyncio / base_subprocess.py: 85%

217 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-24 01:31 +0000

1import collections 

2import subprocess 

3import warnings 

4import os 

5import signal 

6import sys 

7 

8from . import protocols 

9from . import transports 

10from .log import logger 

11 

12 

13class BaseSubprocessTransport(transports.SubprocessTransport): 

14 

15 def __init__(self, loop, protocol, args, shell, 

16 stdin, stdout, stderr, bufsize, 

17 waiter=None, extra=None, **kwargs): 

18 super().__init__(extra) 

19 self._closed = False 

20 self._protocol = protocol 

21 self._loop = loop 

22 self._proc = None 

23 self._pid = None 

24 self._returncode = None 

25 self._exit_waiters = [] 

26 self._pending_calls = collections.deque() 

27 self._pipes = {} 

28 self._finished = False 

29 self._pipes_connected = False 

30 

31 if stdin == subprocess.PIPE: 

32 self._pipes[0] = None 

33 if stdout == subprocess.PIPE: 

34 self._pipes[1] = None 

35 if stderr == subprocess.PIPE: 

36 self._pipes[2] = None 

37 

38 # Create the child process: set the _proc attribute 

39 try: 

40 self._start(args=args, shell=shell, stdin=stdin, stdout=stdout, 

41 stderr=stderr, bufsize=bufsize, **kwargs) 

42 except: 

43 self.close() 

44 raise 

45 

46 self._pid = self._proc.pid 

47 self._extra['subprocess'] = self._proc 

48 

49 if self._loop.get_debug(): 49 ↛ 50line 49 didn't jump to line 50 because the condition on line 49 was never true

50 if isinstance(args, (bytes, str)): 

51 program = args 

52 else: 

53 program = args[0] 

54 logger.debug('process %r created: pid %s', 

55 program, self._pid) 

56 

57 self._loop.create_task(self._connect_pipes(waiter)) 

58 

59 def __repr__(self): 

60 info = [self.__class__.__name__] 

61 if self._closed: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true

62 info.append('closed') 

63 if self._pid is not None: 

64 info.append(f'pid={self._pid}') 

65 if self._returncode is not None: 

66 info.append(f'returncode={self._returncode}') 

67 elif self._pid is not None: 

68 info.append('running') 

69 else: 

70 info.append('not started') 

71 

72 stdin = self._pipes.get(0) 

73 if stdin is not None: 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true

74 info.append(f'stdin={stdin.pipe}') 

75 

76 stdout = self._pipes.get(1) 

77 stderr = self._pipes.get(2) 

78 if stdout is not None and stderr is stdout: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true

79 info.append(f'stdout=stderr={stdout.pipe}') 

80 else: 

81 if stdout is not None: 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true

82 info.append(f'stdout={stdout.pipe}') 

83 if stderr is not None: 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true

84 info.append(f'stderr={stderr.pipe}') 

85 

86 return '<{}>'.format(' '.join(info)) 

87 

88 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 

89 raise NotImplementedError 

90 

91 def set_protocol(self, protocol): 

92 self._protocol = protocol 

93 

94 def get_protocol(self): 

95 return self._protocol 

96 

97 def is_closing(self): 

98 return self._closed 

99 

100 def close(self): 

101 if self._closed: 

102 return 

103 self._closed = True 

104 

105 for proto in self._pipes.values(): 

106 if proto is None: 

107 continue 

108 # See gh-114177 

109 # skip closing the pipe if loop is already closed 

110 # this can happen e.g. when loop is closed immediately after 

111 # process is killed 

112 if self._loop and not self._loop.is_closed(): 

113 proto.pipe.close() 

114 

115 if (self._proc is not None and 

116 # has the child process finished? 

117 self._returncode is None and 

118 # the child process has finished, but the 

119 # transport hasn't been notified yet? 

120 self._proc.poll() is None): 

121 

122 if self._loop.get_debug(): 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true

123 logger.warning('Close running child process: kill %r', self) 

124 

125 try: 

126 self._proc.kill() 

127 except (ProcessLookupError, PermissionError): 

128 # the process may have already exited or may be running setuid 

129 pass 

130 

131 # Don't clear the _proc reference yet: _post_init() may still run 

132 

133 def __del__(self, _warn=warnings.warn): 

134 if not self._closed: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 

136 self.close() 

137 

138 def get_pid(self): 

139 return self._pid 

140 

141 def get_returncode(self): 

142 return self._returncode 

143 

144 def get_pipe_transport(self, fd): 

145 if fd in self._pipes: 

146 return self._pipes[fd].pipe 

147 else: 

148 return None 

149 

150 def _check_proc(self): 

151 if self._proc is None: 

152 raise ProcessLookupError() 

153 

154 if sys.platform == 'win32': 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true

155 def send_signal(self, signal): 

156 self._check_proc() 

157 self._proc.send_signal(signal) 

158 

159 def terminate(self): 

160 self._check_proc() 

161 self._proc.terminate() 

162 

163 def kill(self): 

164 self._check_proc() 

165 self._proc.kill() 

166 else: 

167 def send_signal(self, signal): 

168 self._check_proc() 

169 try: 

170 os.kill(self._proc.pid, signal) 

171 except ProcessLookupError: 

172 pass 

173 

174 def terminate(self): 

175 self.send_signal(signal.SIGTERM) 

176 

177 def kill(self): 

178 self.send_signal(signal.SIGKILL) 

179 

180 async def _connect_pipes(self, waiter): 

181 try: 

182 proc = self._proc 

183 loop = self._loop 

184 

185 if proc.stdin is not None: 

186 _, pipe = await loop.connect_write_pipe( 

187 lambda: WriteSubprocessPipeProto(self, 0), 

188 proc.stdin) 

189 self._pipes[0] = pipe 

190 

191 if proc.stdout is not None: 

192 _, pipe = await loop.connect_read_pipe( 

193 lambda: ReadSubprocessPipeProto(self, 1), 

194 proc.stdout) 

195 self._pipes[1] = pipe 

196 

197 if proc.stderr is not None: 

198 _, pipe = await loop.connect_read_pipe( 

199 lambda: ReadSubprocessPipeProto(self, 2), 

200 proc.stderr) 

201 self._pipes[2] = pipe 

202 

203 assert self._pending_calls is not None 

204 

205 loop.call_soon(self._protocol.connection_made, self) 

206 for callback, data in self._pending_calls: 

207 loop.call_soon(callback, *data) 

208 self._pending_calls = None 

209 except (SystemExit, KeyboardInterrupt): 

210 raise 

211 except BaseException as exc: 

212 if waiter is not None and not waiter.cancelled(): 212 ↛ exitline 212 didn't return from function '_connect_pipes' because the condition on line 212 was always true

213 waiter.set_exception(exc) 

214 else: 

215 if waiter is not None and not waiter.cancelled(): 

216 waiter.set_result(None) 

217 self._pipes_connected = True 

218 

219 def _call(self, cb, *data): 

220 if self._pending_calls is not None: 

221 self._pending_calls.append((cb, data)) 

222 else: 

223 self._loop.call_soon(cb, *data) 

224 

225 def _pipe_connection_lost(self, fd, exc): 

226 self._call(self._protocol.pipe_connection_lost, fd, exc) 

227 self._try_finish() 

228 

229 def _pipe_data_received(self, fd, data): 

230 self._call(self._protocol.pipe_data_received, fd, data) 

231 

232 def _process_exited(self, returncode): 

233 assert returncode is not None, returncode 

234 assert self._returncode is None, self._returncode 

235 if self._loop.get_debug(): 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true

236 logger.info('%r exited with return code %r', self, returncode) 

237 self._returncode = returncode 

238 if self._proc.returncode is None: 

239 # asyncio uses a child watcher: copy the status into the Popen 

240 # object. On Python 3.6, it is required to avoid a ResourceWarning. 

241 self._proc.returncode = returncode 

242 self._call(self._protocol.process_exited) 

243 

244 self._try_finish() 

245 

246 async def _wait(self): 

247 """Wait until the process exit and return the process return code. 

248 

249 This method is a coroutine.""" 

250 if self._returncode is not None: 

251 return self._returncode 

252 

253 waiter = self._loop.create_future() 

254 self._exit_waiters.append(waiter) 

255 return await waiter 

256 

257 def _try_finish(self): 

258 assert not self._finished 

259 if self._returncode is None: 

260 return 

261 if not self._pipes_connected: 

262 # self._pipes_connected can be False if not all pipes were connected 

263 # because either the process failed to start or the self._connect_pipes task 

264 # got cancelled. In this broken state we consider all pipes disconnected and 

265 # to avoid hanging forever in self._wait as otherwise _exit_waiters 

266 # would never be woken up, we wake them up here. 

267 for waiter in self._exit_waiters: 

268 if not waiter.cancelled(): 268 ↛ 267line 268 didn't jump to line 267 because the condition on line 268 was always true

269 waiter.set_result(self._returncode) 

270 if all(p is not None and p.disconnected 

271 for p in self._pipes.values()): 

272 self._finished = True 

273 self._call(self._call_connection_lost, None) 

274 

275 def _call_connection_lost(self, exc): 

276 try: 

277 self._protocol.connection_lost(exc) 

278 finally: 

279 # wake up futures waiting for wait() 

280 for waiter in self._exit_waiters: 

281 if not waiter.cancelled(): 

282 waiter.set_result(self._returncode) 

283 self._exit_waiters = None 

284 self._loop = None 

285 self._proc = None 

286 self._protocol = None 

287 

288 

289class WriteSubprocessPipeProto(protocols.BaseProtocol): 

290 

291 def __init__(self, proc, fd): 

292 self.proc = proc 

293 self.fd = fd 

294 self.pipe = None 

295 self.disconnected = False 

296 

297 def connection_made(self, transport): 

298 self.pipe = transport 

299 

300 def __repr__(self): 

301 return f'<{self.__class__.__name__} fd={self.fd} pipe={self.pipe!r}>' 

302 

303 def connection_lost(self, exc): 

304 self.disconnected = True 

305 self.proc._pipe_connection_lost(self.fd, exc) 

306 self.proc = None 

307 

308 def pause_writing(self): 

309 self.proc._protocol.pause_writing() 

310 

311 def resume_writing(self): 

312 self.proc._protocol.resume_writing() 

313 

314 

315class ReadSubprocessPipeProto(WriteSubprocessPipeProto, 

316 protocols.Protocol): 

317 

318 def data_received(self, data): 

319 self.proc._pipe_data_received(self.fd, data)