Coverage for Lib/asyncio/subprocess.py: 81%
160 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
1__all__ = 'create_subprocess_exec', 'create_subprocess_shell'
3import subprocess
5from . import events
6from . import protocols
7from . import streams
8from . import tasks
9from .log import logger
12PIPE = subprocess.PIPE
13STDOUT = subprocess.STDOUT
14DEVNULL = subprocess.DEVNULL
17class SubprocessStreamProtocol(streams.FlowControlMixin,
18 protocols.SubprocessProtocol):
19 """Like StreamReaderProtocol, but for a subprocess."""
21 def __init__(self, limit, loop):
22 super().__init__(loop=loop)
23 self._limit = limit
24 self.stdin = self.stdout = self.stderr = None
25 self._transport = None
26 self._process_exited = False
27 self._pipe_fds = []
28 self._stdin_closed = self._loop.create_future()
30 def __repr__(self):
31 info = [self.__class__.__name__]
32 if self.stdin is not None:
33 info.append(f'stdin={self.stdin!r}')
34 if self.stdout is not None:
35 info.append(f'stdout={self.stdout!r}')
36 if self.stderr is not None:
37 info.append(f'stderr={self.stderr!r}')
38 return '<{}>'.format(' '.join(info))
40 def connection_made(self, transport):
41 self._transport = transport
43 stdout_transport = transport.get_pipe_transport(1)
44 if stdout_transport is not None:
45 self.stdout = streams.StreamReader(limit=self._limit,
46 loop=self._loop)
47 self.stdout.set_transport(stdout_transport)
48 self._pipe_fds.append(1)
50 stderr_transport = transport.get_pipe_transport(2)
51 if stderr_transport is not None:
52 self.stderr = streams.StreamReader(limit=self._limit,
53 loop=self._loop)
54 self.stderr.set_transport(stderr_transport)
55 self._pipe_fds.append(2)
57 stdin_transport = transport.get_pipe_transport(0)
58 if stdin_transport is not None:
59 self.stdin = streams.StreamWriter(stdin_transport,
60 protocol=self,
61 reader=None,
62 loop=self._loop)
64 def pipe_data_received(self, fd, data):
65 if fd == 1:
66 reader = self.stdout
67 elif fd == 2: 67 ↛ 70line 67 didn't jump to line 70 because the condition on line 67 was always true
68 reader = self.stderr
69 else:
70 reader = None
71 if reader is not None: 71 ↛ exitline 71 didn't return from function 'pipe_data_received' because the condition on line 71 was always true
72 reader.feed_data(data)
74 def pipe_connection_lost(self, fd, exc):
75 if fd == 0:
76 pipe = self.stdin
77 if pipe is not None: 77 ↛ 79line 77 didn't jump to line 79 because the condition on line 77 was always true
78 pipe.close()
79 self.connection_lost(exc)
80 if exc is None:
81 self._stdin_closed.set_result(None)
82 else:
83 self._stdin_closed.set_exception(exc)
84 # Since calling `wait_closed()` is not mandatory,
85 # we shouldn't log the traceback if this is not awaited.
86 self._stdin_closed._log_traceback = False
87 return
88 if fd == 1:
89 reader = self.stdout
90 elif fd == 2: 90 ↛ 93line 90 didn't jump to line 93 because the condition on line 90 was always true
91 reader = self.stderr
92 else:
93 reader = None
94 if reader is not None: 94 ↛ 100line 94 didn't jump to line 100 because the condition on line 94 was always true
95 if exc is None: 95 ↛ 98line 95 didn't jump to line 98 because the condition on line 95 was always true
96 reader.feed_eof()
97 else:
98 reader.set_exception(exc)
100 if fd in self._pipe_fds: 100 ↛ 102line 100 didn't jump to line 102 because the condition on line 100 was always true
101 self._pipe_fds.remove(fd)
102 self._maybe_close_transport()
104 def process_exited(self):
105 self._process_exited = True
106 self._maybe_close_transport()
108 def _maybe_close_transport(self):
109 if len(self._pipe_fds) == 0 and self._process_exited:
110 self._transport.close()
111 self._transport = None
113 def _get_close_waiter(self, stream):
114 if stream is self.stdin:
115 return self._stdin_closed
118class Process:
119 def __init__(self, transport, protocol, loop):
120 self._transport = transport
121 self._protocol = protocol
122 self._loop = loop
123 self.stdin = protocol.stdin
124 self.stdout = protocol.stdout
125 self.stderr = protocol.stderr
126 self.pid = transport.get_pid()
128 def __repr__(self):
129 return f'<{self.__class__.__name__} {self.pid}>'
131 @property
132 def returncode(self):
133 return self._transport.get_returncode()
135 async def wait(self):
136 """Wait until the process exit and return the process return code."""
137 return await self._transport._wait()
139 def send_signal(self, signal):
140 self._transport.send_signal(signal)
142 def terminate(self):
143 self._transport.terminate()
145 def kill(self):
146 self._transport.kill()
148 async def _feed_stdin(self, input):
149 debug = self._loop.get_debug()
150 try:
151 if input is not None:
152 self.stdin.write(input)
153 if debug: 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true
154 logger.debug(
155 '%r communicate: feed stdin (%s bytes)', self, len(input))
157 await self.stdin.drain()
158 except (BrokenPipeError, ConnectionResetError) as exc:
159 # communicate() ignores BrokenPipeError and ConnectionResetError.
160 # write() and drain() can raise these exceptions.
161 if debug: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true
162 logger.debug('%r communicate: stdin got %r', self, exc)
164 if debug: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 logger.debug('%r communicate: close stdin', self)
166 self.stdin.close()
168 async def _noop(self):
169 return None
171 async def _read_stream(self, fd):
172 transport = self._transport.get_pipe_transport(fd)
173 if fd == 2:
174 stream = self.stderr
175 else:
176 assert fd == 1
177 stream = self.stdout
178 if self._loop.get_debug(): 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 name = 'stdout' if fd == 1 else 'stderr'
180 logger.debug('%r communicate: read %s', self, name)
181 output = await stream.read()
182 if self._loop.get_debug(): 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true
183 name = 'stdout' if fd == 1 else 'stderr'
184 logger.debug('%r communicate: close %s', self, name)
185 transport.close()
186 return output
188 async def communicate(self, input=None):
189 if self.stdin is not None:
190 stdin = self._feed_stdin(input)
191 else:
192 stdin = self._noop()
193 if self.stdout is not None:
194 stdout = self._read_stream(1)
195 else:
196 stdout = self._noop()
197 if self.stderr is not None:
198 stderr = self._read_stream(2)
199 else:
200 stderr = self._noop()
201 stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr)
202 await self.wait()
203 return (stdout, stderr)
206async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
207 limit=streams._DEFAULT_LIMIT, **kwds):
208 loop = events.get_running_loop()
209 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
210 loop=loop)
211 transport, protocol = await loop.subprocess_shell(
212 protocol_factory,
213 cmd, stdin=stdin, stdout=stdout,
214 stderr=stderr, **kwds)
215 return Process(transport, protocol, loop)
218async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
219 stderr=None, limit=streams._DEFAULT_LIMIT,
220 **kwds):
221 loop = events.get_running_loop()
222 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
223 loop=loop)
224 transport, protocol = await loop.subprocess_exec(
225 protocol_factory,
226 program, *args,
227 stdin=stdin, stdout=stdout,
228 stderr=stderr, **kwds)
229 return Process(transport, protocol, loop)