Coverage for Lib / asyncio / base_subprocess.py: 85%
217 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 01:31 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 01:31 +0000
1import collections
2import subprocess
3import warnings
4import os
5import signal
6import sys
8from . import protocols
9from . import transports
10from .log import logger
13class BaseSubprocessTransport(transports.SubprocessTransport):
15 def __init__(self, loop, protocol, args, shell,
16 stdin, stdout, stderr, bufsize,
17 waiter=None, extra=None, **kwargs):
18 super().__init__(extra)
19 self._closed = False
20 self._protocol = protocol
21 self._loop = loop
22 self._proc = None
23 self._pid = None
24 self._returncode = None
25 self._exit_waiters = []
26 self._pending_calls = collections.deque()
27 self._pipes = {}
28 self._finished = False
29 self._pipes_connected = False
31 if stdin == subprocess.PIPE:
32 self._pipes[0] = None
33 if stdout == subprocess.PIPE:
34 self._pipes[1] = None
35 if stderr == subprocess.PIPE:
36 self._pipes[2] = None
38 # Create the child process: set the _proc attribute
39 try:
40 self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
41 stderr=stderr, bufsize=bufsize, **kwargs)
42 except:
43 self.close()
44 raise
46 self._pid = self._proc.pid
47 self._extra['subprocess'] = self._proc
49 if self._loop.get_debug(): 49 ↛ 50line 49 didn't jump to line 50 because the condition on line 49 was never true
50 if isinstance(args, (bytes, str)):
51 program = args
52 else:
53 program = args[0]
54 logger.debug('process %r created: pid %s',
55 program, self._pid)
57 self._loop.create_task(self._connect_pipes(waiter))
59 def __repr__(self):
60 info = [self.__class__.__name__]
61 if self._closed: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true
62 info.append('closed')
63 if self._pid is not None:
64 info.append(f'pid={self._pid}')
65 if self._returncode is not None:
66 info.append(f'returncode={self._returncode}')
67 elif self._pid is not None:
68 info.append('running')
69 else:
70 info.append('not started')
72 stdin = self._pipes.get(0)
73 if stdin is not None: 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true
74 info.append(f'stdin={stdin.pipe}')
76 stdout = self._pipes.get(1)
77 stderr = self._pipes.get(2)
78 if stdout is not None and stderr is stdout: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true
79 info.append(f'stdout=stderr={stdout.pipe}')
80 else:
81 if stdout is not None: 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true
82 info.append(f'stdout={stdout.pipe}')
83 if stderr is not None: 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true
84 info.append(f'stderr={stderr.pipe}')
86 return '<{}>'.format(' '.join(info))
88 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
89 raise NotImplementedError
91 def set_protocol(self, protocol):
92 self._protocol = protocol
94 def get_protocol(self):
95 return self._protocol
97 def is_closing(self):
98 return self._closed
100 def close(self):
101 if self._closed:
102 return
103 self._closed = True
105 for proto in self._pipes.values():
106 if proto is None:
107 continue
108 # See gh-114177
109 # skip closing the pipe if loop is already closed
110 # this can happen e.g. when loop is closed immediately after
111 # process is killed
112 if self._loop and not self._loop.is_closed():
113 proto.pipe.close()
115 if (self._proc is not None and
116 # has the child process finished?
117 self._returncode is None and
118 # the child process has finished, but the
119 # transport hasn't been notified yet?
120 self._proc.poll() is None):
122 if self._loop.get_debug(): 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true
123 logger.warning('Close running child process: kill %r', self)
125 try:
126 self._proc.kill()
127 except (ProcessLookupError, PermissionError):
128 # the process may have already exited or may be running setuid
129 pass
131 # Don't clear the _proc reference yet: _post_init() may still run
133 def __del__(self, _warn=warnings.warn):
134 if not self._closed: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
136 self.close()
138 def get_pid(self):
139 return self._pid
141 def get_returncode(self):
142 return self._returncode
144 def get_pipe_transport(self, fd):
145 if fd in self._pipes:
146 return self._pipes[fd].pipe
147 else:
148 return None
150 def _check_proc(self):
151 if self._proc is None:
152 raise ProcessLookupError()
154 if sys.platform == 'win32': 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true
155 def send_signal(self, signal):
156 self._check_proc()
157 self._proc.send_signal(signal)
159 def terminate(self):
160 self._check_proc()
161 self._proc.terminate()
163 def kill(self):
164 self._check_proc()
165 self._proc.kill()
166 else:
167 def send_signal(self, signal):
168 self._check_proc()
169 try:
170 os.kill(self._proc.pid, signal)
171 except ProcessLookupError:
172 pass
174 def terminate(self):
175 self.send_signal(signal.SIGTERM)
177 def kill(self):
178 self.send_signal(signal.SIGKILL)
180 async def _connect_pipes(self, waiter):
181 try:
182 proc = self._proc
183 loop = self._loop
185 if proc.stdin is not None:
186 _, pipe = await loop.connect_write_pipe(
187 lambda: WriteSubprocessPipeProto(self, 0),
188 proc.stdin)
189 self._pipes[0] = pipe
191 if proc.stdout is not None:
192 _, pipe = await loop.connect_read_pipe(
193 lambda: ReadSubprocessPipeProto(self, 1),
194 proc.stdout)
195 self._pipes[1] = pipe
197 if proc.stderr is not None:
198 _, pipe = await loop.connect_read_pipe(
199 lambda: ReadSubprocessPipeProto(self, 2),
200 proc.stderr)
201 self._pipes[2] = pipe
203 assert self._pending_calls is not None
205 loop.call_soon(self._protocol.connection_made, self)
206 for callback, data in self._pending_calls:
207 loop.call_soon(callback, *data)
208 self._pending_calls = None
209 except (SystemExit, KeyboardInterrupt):
210 raise
211 except BaseException as exc:
212 if waiter is not None and not waiter.cancelled(): 212 ↛ exitline 212 didn't return from function '_connect_pipes' because the condition on line 212 was always true
213 waiter.set_exception(exc)
214 else:
215 if waiter is not None and not waiter.cancelled():
216 waiter.set_result(None)
217 self._pipes_connected = True
219 def _call(self, cb, *data):
220 if self._pending_calls is not None:
221 self._pending_calls.append((cb, data))
222 else:
223 self._loop.call_soon(cb, *data)
225 def _pipe_connection_lost(self, fd, exc):
226 self._call(self._protocol.pipe_connection_lost, fd, exc)
227 self._try_finish()
229 def _pipe_data_received(self, fd, data):
230 self._call(self._protocol.pipe_data_received, fd, data)
232 def _process_exited(self, returncode):
233 assert returncode is not None, returncode
234 assert self._returncode is None, self._returncode
235 if self._loop.get_debug(): 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true
236 logger.info('%r exited with return code %r', self, returncode)
237 self._returncode = returncode
238 if self._proc.returncode is None:
239 # asyncio uses a child watcher: copy the status into the Popen
240 # object. On Python 3.6, it is required to avoid a ResourceWarning.
241 self._proc.returncode = returncode
242 self._call(self._protocol.process_exited)
244 self._try_finish()
246 async def _wait(self):
247 """Wait until the process exit and return the process return code.
249 This method is a coroutine."""
250 if self._returncode is not None:
251 return self._returncode
253 waiter = self._loop.create_future()
254 self._exit_waiters.append(waiter)
255 return await waiter
257 def _try_finish(self):
258 assert not self._finished
259 if self._returncode is None:
260 return
261 if not self._pipes_connected:
262 # self._pipes_connected can be False if not all pipes were connected
263 # because either the process failed to start or the self._connect_pipes task
264 # got cancelled. In this broken state we consider all pipes disconnected and
265 # to avoid hanging forever in self._wait as otherwise _exit_waiters
266 # would never be woken up, we wake them up here.
267 for waiter in self._exit_waiters:
268 if not waiter.cancelled(): 268 ↛ 267line 268 didn't jump to line 267 because the condition on line 268 was always true
269 waiter.set_result(self._returncode)
270 if all(p is not None and p.disconnected
271 for p in self._pipes.values()):
272 self._finished = True
273 self._call(self._call_connection_lost, None)
275 def _call_connection_lost(self, exc):
276 try:
277 self._protocol.connection_lost(exc)
278 finally:
279 # wake up futures waiting for wait()
280 for waiter in self._exit_waiters:
281 if not waiter.cancelled():
282 waiter.set_result(self._returncode)
283 self._exit_waiters = None
284 self._loop = None
285 self._proc = None
286 self._protocol = None
289class WriteSubprocessPipeProto(protocols.BaseProtocol):
291 def __init__(self, proc, fd):
292 self.proc = proc
293 self.fd = fd
294 self.pipe = None
295 self.disconnected = False
297 def connection_made(self, transport):
298 self.pipe = transport
300 def __repr__(self):
301 return f'<{self.__class__.__name__} fd={self.fd} pipe={self.pipe!r}>'
303 def connection_lost(self, exc):
304 self.disconnected = True
305 self.proc._pipe_connection_lost(self.fd, exc)
306 self.proc = None
308 def pause_writing(self):
309 self.proc._protocol.pause_writing()
311 def resume_writing(self):
312 self.proc._protocol.resume_writing()
315class ReadSubprocessPipeProto(WriteSubprocessPipeProto,
316 protocols.Protocol):
318 def data_received(self, data):
319 self.proc._pipe_data_received(self.fd, data)