Coverage for Lib/asyncio/streams.py: 93%

412 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-26 01:27 +0000

1__all__ = ( 

2 'StreamReader', 'StreamWriter', 'StreamReaderProtocol', 

3 'open_connection', 'start_server') 

4 

5import collections 

6import socket 

7import sys 

8import warnings 

9import weakref 

10 

11if hasattr(socket, 'AF_UNIX'): 11 ↛ 14line 11 didn't jump to line 14 because the condition on line 11 was always true

12 __all__ += ('open_unix_connection', 'start_unix_server') 

13 

14from . import coroutines 

15from . import events 

16from . import exceptions 

17from . import format_helpers 

18from . import protocols 

19from .log import logger 

20from .tasks import sleep 

21 

22 

23_DEFAULT_LIMIT = 2 ** 16 # 64 KiB 

24 

25 

26async def open_connection(host=None, port=None, *, 

27 limit=_DEFAULT_LIMIT, **kwds): 

28 """A wrapper for create_connection() returning a (reader, writer) pair. 

29 

30 The reader returned is a StreamReader instance; the writer is a 

31 StreamWriter instance. 

32 

33 The arguments are all the usual arguments to create_connection() 

34 except protocol_factory; most common are positional host and port, 

35 with various optional keyword arguments following. 

36 

37 Additional optional keyword arguments are loop (to set the event loop 

38 instance to use) and limit (to set the buffer limit passed to the 

39 StreamReader). 

40 

41 (If you want to customize the StreamReader and/or 

42 StreamReaderProtocol classes, just copy the code -- there's 

43 really nothing special here except some convenience.) 

44 """ 

45 loop = events.get_running_loop() 

46 reader = StreamReader(limit=limit, loop=loop) 

47 protocol = StreamReaderProtocol(reader, loop=loop) 

48 transport, _ = await loop.create_connection( 

49 lambda: protocol, host, port, **kwds) 

50 writer = StreamWriter(transport, protocol, reader, loop) 

51 return reader, writer 

52 

53 

54async def start_server(client_connected_cb, host=None, port=None, *, 

55 limit=_DEFAULT_LIMIT, **kwds): 

56 """Start a socket server, call back for each client connected. 

57 

58 The first parameter, `client_connected_cb`, takes two parameters: 

59 client_reader, client_writer. client_reader is a StreamReader 

60 object, while client_writer is a StreamWriter object. This 

61 parameter can either be a plain callback function or a coroutine; 

62 if it is a coroutine, it will be automatically converted into a 

63 Task. 

64 

65 The rest of the arguments are all the usual arguments to 

66 loop.create_server() except protocol_factory; most common are 

67 positional host and port, with various optional keyword arguments 

68 following. The return value is the same as loop.create_server(). 

69 

70 Additional optional keyword argument is limit (to set the buffer 

71 limit passed to the StreamReader). 

72 

73 The return value is the same as loop.create_server(), i.e. a 

74 Server object which can be used to stop the service. 

75 """ 

76 loop = events.get_running_loop() 

77 

78 def factory(): 

79 reader = StreamReader(limit=limit, loop=loop) 

80 protocol = StreamReaderProtocol(reader, client_connected_cb, 

81 loop=loop) 

82 return protocol 

83 

84 return await loop.create_server(factory, host, port, **kwds) 

85 

86 

87if hasattr(socket, 'AF_UNIX'): 87 ↛ 116line 87 didn't jump to line 116 because the condition on line 87 was always true

88 # UNIX Domain Sockets are supported on this platform 

89 

90 async def open_unix_connection(path=None, *, 

91 limit=_DEFAULT_LIMIT, **kwds): 

92 """Similar to `open_connection` but works with UNIX Domain Sockets.""" 

93 loop = events.get_running_loop() 

94 

95 reader = StreamReader(limit=limit, loop=loop) 

96 protocol = StreamReaderProtocol(reader, loop=loop) 

97 transport, _ = await loop.create_unix_connection( 

98 lambda: protocol, path, **kwds) 

99 writer = StreamWriter(transport, protocol, reader, loop) 

100 return reader, writer 

101 

102 async def start_unix_server(client_connected_cb, path=None, *, 

103 limit=_DEFAULT_LIMIT, **kwds): 

104 """Similar to `start_server` but works with UNIX Domain Sockets.""" 

105 loop = events.get_running_loop() 

106 

107 def factory(): 

108 reader = StreamReader(limit=limit, loop=loop) 

109 protocol = StreamReaderProtocol(reader, client_connected_cb, 

110 loop=loop) 

111 return protocol 

112 

113 return await loop.create_unix_server(factory, path, **kwds) 

114 

115 

116class FlowControlMixin(protocols.Protocol): 

117 """Reusable flow control logic for StreamWriter.drain(). 

118 

119 This implements the protocol methods pause_writing(), 

120 resume_writing() and connection_lost(). If the subclass overrides 

121 these it must call the super methods. 

122 

123 StreamWriter.drain() must wait for _drain_helper() coroutine. 

124 """ 

125 

126 def __init__(self, loop=None): 

127 if loop is None: 

128 self._loop = events.get_event_loop() 

129 else: 

130 self._loop = loop 

131 self._paused = False 

132 self._drain_waiters = collections.deque() 

133 self._connection_lost = False 

134 

135 def pause_writing(self): 

136 assert not self._paused 

137 self._paused = True 

138 if self._loop.get_debug(): 

139 logger.debug("%r pauses writing", self) 

140 

141 def resume_writing(self): 

142 assert self._paused 

143 self._paused = False 

144 if self._loop.get_debug(): 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true

145 logger.debug("%r resumes writing", self) 

146 

147 for waiter in self._drain_waiters: 

148 if not waiter.done(): 148 ↛ 147line 148 didn't jump to line 147 because the condition on line 148 was always true

149 waiter.set_result(None) 

150 

151 def connection_lost(self, exc): 

152 self._connection_lost = True 

153 # Wake up the writer(s) if currently paused. 

154 if not self._paused: 

155 return 

156 

157 for waiter in self._drain_waiters: 

158 if not waiter.done(): 158 ↛ 157line 158 didn't jump to line 157 because the condition on line 158 was always true

159 if exc is None: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true

160 waiter.set_result(None) 

161 else: 

162 waiter.set_exception(exc) 

163 

164 async def _drain_helper(self): 

165 if self._connection_lost: 

166 raise ConnectionResetError('Connection lost') 

167 if not self._paused: 

168 return 

169 waiter = self._loop.create_future() 

170 self._drain_waiters.append(waiter) 

171 try: 

172 await waiter 

173 finally: 

174 self._drain_waiters.remove(waiter) 

175 

176 def _get_close_waiter(self, stream): 

177 raise NotImplementedError 

178 

179 

180class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): 

181 """Helper class to adapt between Protocol and StreamReader. 

182 

183 (This is a helper class instead of making StreamReader itself a 

184 Protocol subclass, because the StreamReader has other potential 

185 uses, and to prevent the user of the StreamReader to accidentally 

186 call inappropriate methods of the protocol.) 

187 """ 

188 

189 _source_traceback = None 

190 

191 def __init__(self, stream_reader, client_connected_cb=None, loop=None): 

192 super().__init__(loop=loop) 

193 if stream_reader is not None: 193 ↛ 197line 193 didn't jump to line 197 because the condition on line 193 was always true

194 self._stream_reader_wr = weakref.ref(stream_reader) 

195 self._source_traceback = stream_reader._source_traceback 

196 else: 

197 self._stream_reader_wr = None 

198 if client_connected_cb is not None: 

199 # This is a stream created by the `create_server()` function. 

200 # Keep a strong reference to the reader until a connection 

201 # is established. 

202 self._strong_reader = stream_reader 

203 self._reject_connection = False 

204 self._task = None 

205 self._transport = None 

206 self._client_connected_cb = client_connected_cb 

207 self._over_ssl = False 

208 self._closed = self._loop.create_future() 

209 

210 @property 

211 def _stream_reader(self): 

212 if self._stream_reader_wr is None: 

213 return None 

214 return self._stream_reader_wr() 

215 

216 def _replace_transport(self, transport): 

217 self._transport = transport 

218 self._over_ssl = transport.get_extra_info('sslcontext') is not None 

219 

220 def connection_made(self, transport): 

221 if self._reject_connection: 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true

222 context = { 

223 'message': ('An open stream was garbage collected prior to ' 

224 'establishing network connection; ' 

225 'call "stream.close()" explicitly.') 

226 } 

227 if self._source_traceback: 

228 context['source_traceback'] = self._source_traceback 

229 self._loop.call_exception_handler(context) 

230 transport.abort() 

231 return 

232 self._transport = transport 

233 reader = self._stream_reader 

234 if reader is not None: 234 ↛ 236line 234 didn't jump to line 236 because the condition on line 234 was always true

235 reader.set_transport(transport) 

236 self._over_ssl = transport.get_extra_info('sslcontext') is not None 

237 if self._client_connected_cb is not None: 

238 writer = StreamWriter(transport, self, reader, self._loop) 

239 res = self._client_connected_cb(reader, writer) 

240 if coroutines.iscoroutine(res): 

241 def callback(task): 

242 if task.cancelled(): 

243 transport.close() 

244 return 

245 exc = task.exception() 

246 if exc is not None: 

247 self._loop.call_exception_handler({ 

248 'message': 'Unhandled exception in client_connected_cb', 

249 'exception': exc, 

250 'transport': transport, 

251 }) 

252 transport.close() 

253 

254 self._task = self._loop.create_task(res) 

255 self._task.add_done_callback(callback) 

256 

257 self._strong_reader = None 

258 

259 def connection_lost(self, exc): 

260 reader = self._stream_reader 

261 if reader is not None: 

262 if exc is None: 

263 reader.feed_eof() 

264 else: 

265 reader.set_exception(exc) 

266 if not self._closed.done(): 

267 if exc is None: 

268 self._closed.set_result(None) 

269 else: 

270 self._closed.set_exception(exc) 

271 super().connection_lost(exc) 

272 self._stream_reader_wr = None 

273 self._task = None 

274 self._transport = None 

275 

276 def data_received(self, data): 

277 reader = self._stream_reader 

278 if reader is not None: 278 ↛ exitline 278 didn't return from function 'data_received' because the condition on line 278 was always true

279 reader.feed_data(data) 

280 

281 def eof_received(self): 

282 reader = self._stream_reader 

283 if reader is not None: 283 ↛ 285line 283 didn't jump to line 285 because the condition on line 283 was always true

284 reader.feed_eof() 

285 if self._over_ssl: 

286 # Prevent a warning in SSLProtocol.eof_received: 

287 # "returning true from eof_received() 

288 # has no effect when using ssl" 

289 return False 

290 return True 

291 

292 def _get_close_waiter(self, stream): 

293 return self._closed 

294 

295 def __del__(self): 

296 # Prevent reports about unhandled exceptions. 

297 # Better than self._closed._log_traceback = False hack 

298 try: 

299 closed = self._closed 

300 except AttributeError: 

301 pass # failed constructor 

302 else: 

303 if closed.done() and not closed.cancelled(): 

304 closed.exception() 

305 

306 

307class StreamWriter: 

308 """Wraps a Transport. 

309 

310 This exposes write(), writelines(), [can_]write_eof(), 

311 get_extra_info() and close(). It adds drain() which returns an 

312 optional Future on which you can wait for flow control. It also 

313 adds a transport property which references the Transport 

314 directly. 

315 """ 

316 

317 def __init__(self, transport, protocol, reader, loop): 

318 self._transport = transport 

319 self._protocol = protocol 

320 # drain() expects that the reader has an exception() method 

321 assert reader is None or isinstance(reader, StreamReader) 

322 self._reader = reader 

323 self._loop = loop 

324 self._complete_fut = self._loop.create_future() 

325 self._complete_fut.set_result(None) 

326 

327 def __repr__(self): 

328 info = [self.__class__.__name__, f'transport={self._transport!r}'] 

329 if self._reader is not None: 329 ↛ 331line 329 didn't jump to line 331 because the condition on line 329 was always true

330 info.append(f'reader={self._reader!r}') 

331 return '<{}>'.format(' '.join(info)) 

332 

333 @property 

334 def transport(self): 

335 return self._transport 

336 

337 def write(self, data): 

338 self._transport.write(data) 

339 

340 def writelines(self, data): 

341 self._transport.writelines(data) 

342 

343 def write_eof(self): 

344 return self._transport.write_eof() 

345 

346 def can_write_eof(self): 

347 return self._transport.can_write_eof() 

348 

349 def close(self): 

350 return self._transport.close() 

351 

352 def is_closing(self): 

353 return self._transport.is_closing() 

354 

355 async def wait_closed(self): 

356 await self._protocol._get_close_waiter(self) 

357 

358 def get_extra_info(self, name, default=None): 

359 return self._transport.get_extra_info(name, default) 

360 

361 async def drain(self): 

362 """Flush the write buffer. 

363 

364 The intended use is to write 

365 

366 w.write(data) 

367 await w.drain() 

368 """ 

369 if self._reader is not None: 

370 exc = self._reader.exception() 

371 if exc is not None: 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true

372 raise exc 

373 if self._transport.is_closing(): 

374 # Wait for protocol.connection_lost() call 

375 # Raise connection closing error if any, 

376 # ConnectionResetError otherwise 

377 # Yield to the event loop so connection_lost() may be 

378 # called. Without this, _drain_helper() would return 

379 # immediately, and code that calls 

380 # write(...); await drain() 

381 # in a loop would never call connection_lost(), so it 

382 # would not see an error when the socket is closed. 

383 await sleep(0) 

384 await self._protocol._drain_helper() 

385 

386 async def start_tls(self, sslcontext, *, 

387 server_hostname=None, 

388 ssl_handshake_timeout=None, 

389 ssl_shutdown_timeout=None): 

390 """Upgrade an existing stream-based connection to TLS.""" 

391 server_side = self._protocol._client_connected_cb is not None 

392 protocol = self._protocol 

393 await self.drain() 

394 new_transport = await self._loop.start_tls( # type: ignore 

395 self._transport, protocol, sslcontext, 

396 server_side=server_side, server_hostname=server_hostname, 

397 ssl_handshake_timeout=ssl_handshake_timeout, 

398 ssl_shutdown_timeout=ssl_shutdown_timeout) 

399 self._transport = new_transport 

400 protocol._replace_transport(new_transport) 

401 

402 def __del__(self, warnings=warnings): 

403 if not self._transport.is_closing(): 

404 if self._loop.is_closed(): 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true

405 warnings.warn("loop is closed", ResourceWarning) 

406 else: 

407 self.close() 

408 warnings.warn(f"unclosed {self!r}", ResourceWarning) 

409 

410class StreamReader: 

411 

412 _source_traceback = None 

413 

414 def __init__(self, limit=_DEFAULT_LIMIT, loop=None): 

415 # The line length limit is a security feature; 

416 # it also doubles as half the buffer limit. 

417 

418 if limit <= 0: 

419 raise ValueError('Limit cannot be <= 0') 

420 

421 self._limit = limit 

422 if loop is None: 

423 self._loop = events.get_event_loop() 

424 else: 

425 self._loop = loop 

426 self._buffer = bytearray() 

427 self._eof = False # Whether we're done. 

428 self._waiter = None # A future used by _wait_for_data() 

429 self._exception = None 

430 self._transport = None 

431 self._paused = False 

432 if self._loop.get_debug(): 

433 self._source_traceback = format_helpers.extract_stack( 

434 sys._getframe(1)) 

435 

436 def __repr__(self): 

437 info = ['StreamReader'] 

438 if self._buffer: 

439 info.append(f'{len(self._buffer)} bytes') 

440 if self._eof: 

441 info.append('eof') 

442 if self._limit != _DEFAULT_LIMIT: 

443 info.append(f'limit={self._limit}') 

444 if self._waiter: 

445 info.append(f'waiter={self._waiter!r}') 

446 if self._exception: 

447 info.append(f'exception={self._exception!r}') 

448 if self._transport: 

449 info.append(f'transport={self._transport!r}') 

450 if self._paused: 450 ↛ 451line 450 didn't jump to line 451 because the condition on line 450 was never true

451 info.append('paused') 

452 return '<{}>'.format(' '.join(info)) 

453 

454 def exception(self): 

455 return self._exception 

456 

457 def set_exception(self, exc): 

458 self._exception = exc 

459 

460 waiter = self._waiter 

461 if waiter is not None: 

462 self._waiter = None 

463 if not waiter.cancelled(): 463 ↛ exitline 463 didn't return from function 'set_exception' because the condition on line 463 was always true

464 waiter.set_exception(exc) 

465 

466 def _wakeup_waiter(self): 

467 """Wakeup read*() functions waiting for data or EOF.""" 

468 waiter = self._waiter 

469 if waiter is not None: 

470 self._waiter = None 

471 if not waiter.cancelled(): 471 ↛ exitline 471 didn't return from function '_wakeup_waiter' because the condition on line 471 was always true

472 waiter.set_result(None) 

473 

474 def set_transport(self, transport): 

475 assert self._transport is None, 'Transport already set' 

476 self._transport = transport 

477 

478 def _maybe_resume_transport(self): 

479 if self._paused and len(self._buffer) <= self._limit: 

480 self._paused = False 

481 self._transport.resume_reading() 

482 

483 def feed_eof(self): 

484 self._eof = True 

485 self._wakeup_waiter() 

486 

487 def at_eof(self): 

488 """Return True if the buffer is empty and 'feed_eof' was called.""" 

489 return self._eof and not self._buffer 

490 

491 def feed_data(self, data): 

492 assert not self._eof, 'feed_data after feed_eof' 

493 

494 if not data: 

495 return 

496 

497 self._buffer.extend(data) 

498 self._wakeup_waiter() 

499 

500 if (self._transport is not None and 

501 not self._paused and 

502 len(self._buffer) > 2 * self._limit): 

503 try: 

504 self._transport.pause_reading() 

505 except NotImplementedError: 

506 # The transport can't be paused. 

507 # We'll just have to buffer all data. 

508 # Forget the transport so we don't keep trying. 

509 self._transport = None 

510 else: 

511 self._paused = True 

512 

513 async def _wait_for_data(self, func_name): 

514 """Wait until feed_data() or feed_eof() is called. 

515 

516 If stream was paused, automatically resume it. 

517 """ 

518 # StreamReader uses a future to link the protocol feed_data() method 

519 # to a read coroutine. Running two read coroutines at the same time 

520 # would have an unexpected behaviour. It would not possible to know 

521 # which coroutine would get the next data. 

522 if self._waiter is not None: 522 ↛ 523line 522 didn't jump to line 523 because the condition on line 522 was never true

523 raise RuntimeError( 

524 f'{func_name}() called while another coroutine is ' 

525 f'already waiting for incoming data') 

526 

527 assert not self._eof, '_wait_for_data after EOF' 

528 

529 # Waiting for data while paused will make deadlock, so prevent it. 

530 # This is essential for readexactly(n) for case when n > self._limit. 

531 if self._paused: 531 ↛ 532line 531 didn't jump to line 532 because the condition on line 531 was never true

532 self._paused = False 

533 self._transport.resume_reading() 

534 

535 self._waiter = self._loop.create_future() 

536 try: 

537 await self._waiter 

538 finally: 

539 self._waiter = None 

540 

541 async def readline(self): 

542 """Read chunk of data from the stream until newline (b'\n') is found. 

543 

544 On success, return chunk that ends with newline. If only partial 

545 line can be read due to EOF, return incomplete line without 

546 terminating newline. When EOF was reached while no bytes read, empty 

547 bytes object is returned. 

548 

549 If limit is reached, ValueError will be raised. In that case, if 

550 newline was found, complete line including newline will be removed 

551 from internal buffer. Else, internal buffer will be cleared. Limit is 

552 compared against part of the line without newline. 

553 

554 If stream was paused, this function will automatically resume it if 

555 needed. 

556 """ 

557 sep = b'\n' 

558 seplen = len(sep) 

559 try: 

560 line = await self.readuntil(sep) 

561 except exceptions.IncompleteReadError as e: 

562 return e.partial 

563 except exceptions.LimitOverrunError as e: 

564 if self._buffer.startswith(sep, e.consumed): 

565 del self._buffer[:e.consumed + seplen] 

566 else: 

567 self._buffer.clear() 

568 self._maybe_resume_transport() 

569 raise ValueError(e.args[0]) 

570 return line 

571 

572 async def readuntil(self, separator=b'\n'): 

573 """Read data from the stream until ``separator`` is found. 

574 

575 On success, the data and separator will be removed from the 

576 internal buffer (consumed). Returned data will include the 

577 separator at the end. 

578 

579 Configured stream limit is used to check result. Limit sets the 

580 maximal length of data that can be returned, not counting the 

581 separator. 

582 

583 If an EOF occurs and the complete separator is still not found, 

584 an IncompleteReadError exception will be raised, and the internal 

585 buffer will be reset. The IncompleteReadError.partial attribute 

586 may contain the separator partially. 

587 

588 If the data cannot be read because of over limit, a 

589 LimitOverrunError exception will be raised, and the data 

590 will be left in the internal buffer, so it can be read again. 

591 

592 The ``separator`` may also be a tuple of separators. In this 

593 case the return value will be the shortest possible that has any 

594 separator as the suffix. For the purposes of LimitOverrunError, 

595 the shortest possible separator is considered to be the one that 

596 matched. 

597 """ 

598 if isinstance(separator, tuple): 

599 # Makes sure shortest matches wins 

600 separator = sorted(separator, key=len) 

601 else: 

602 separator = [separator] 

603 if not separator: 

604 raise ValueError('Separator should contain at least one element') 

605 min_seplen = len(separator[0]) 

606 max_seplen = len(separator[-1]) 

607 if min_seplen == 0: 

608 raise ValueError('Separator should be at least one-byte string') 

609 

610 if self._exception is not None: 

611 raise self._exception 

612 

613 # Consume whole buffer except last bytes, which length is 

614 # one less than max_seplen. Let's check corner cases with 

615 # separator[-1]='SEPARATOR': 

616 # * we have received almost complete separator (without last 

617 # byte). i.e buffer='some textSEPARATO'. In this case we 

618 # can safely consume max_seplen - 1 bytes. 

619 # * last byte of buffer is first byte of separator, i.e. 

620 # buffer='abcdefghijklmnopqrS'. We may safely consume 

621 # everything except that last byte, but this require to 

622 # analyze bytes of buffer that match partial separator. 

623 # This is slow and/or require FSM. For this case our 

624 # implementation is not optimal, since require rescanning 

625 # of data that is known to not belong to separator. In 

626 # real world, separator will not be so long to notice 

627 # performance problems. Even when reading MIME-encoded 

628 # messages :) 

629 

630 # `offset` is the number of bytes from the beginning of the buffer 

631 # where there is no occurrence of any `separator`. 

632 offset = 0 

633 

634 # Loop until we find a `separator` in the buffer, exceed the buffer size, 

635 # or an EOF has happened. 

636 while True: 

637 buflen = len(self._buffer) 

638 

639 # Check if we now have enough data in the buffer for shortest 

640 # separator to fit. 

641 if buflen - offset >= min_seplen: 

642 match_start = None 

643 match_end = None 

644 for sep in separator: 

645 isep = self._buffer.find(sep, offset) 

646 

647 if isep != -1: 

648 # `separator` is in the buffer. `match_start` and 

649 # `match_end` will be used later to retrieve the 

650 # data. 

651 end = isep + len(sep) 

652 if match_end is None or end < match_end: 

653 match_end = end 

654 match_start = isep 

655 if match_end is not None: 

656 break 

657 

658 # see upper comment for explanation. 

659 offset = max(0, buflen + 1 - max_seplen) 

660 if offset > self._limit: 

661 raise exceptions.LimitOverrunError( 

662 'Separator is not found, and chunk exceed the limit', 

663 offset) 

664 

665 # Complete message (with full separator) may be present in buffer 

666 # even when EOF flag is set. This may happen when the last chunk 

667 # adds data which makes separator be found. That's why we check for 

668 # EOF *after* inspecting the buffer. 

669 if self._eof: 

670 chunk = bytes(self._buffer) 

671 self._buffer.clear() 

672 raise exceptions.IncompleteReadError(chunk, None) 

673 

674 # _wait_for_data() will resume reading if stream was paused. 

675 await self._wait_for_data('readuntil') 

676 

677 if match_start > self._limit: 

678 raise exceptions.LimitOverrunError( 

679 'Separator is found, but chunk is longer than limit', match_start) 

680 

681 chunk = self._buffer[:match_end] 

682 del self._buffer[:match_end] 

683 self._maybe_resume_transport() 

684 return bytes(chunk) 

685 

686 async def read(self, n=-1): 

687 """Read up to `n` bytes from the stream. 

688 

689 If `n` is not provided or set to -1, 

690 read until EOF, then return all read bytes. 

691 If EOF was received and the internal buffer is empty, 

692 return an empty bytes object. 

693 

694 If `n` is 0, return an empty bytes object immediately. 

695 

696 If `n` is positive, return at most `n` available bytes 

697 as soon as at least 1 byte is available in the internal buffer. 

698 If EOF is received before any byte is read, return an empty 

699 bytes object. 

700 

701 Returned value is not limited with limit, configured at stream 

702 creation. 

703 

704 If stream was paused, this function will automatically resume it if 

705 needed. 

706 """ 

707 

708 if self._exception is not None: 

709 raise self._exception 

710 

711 if n == 0: 

712 return b'' 

713 

714 if n < 0: 

715 # This used to just loop creating a new waiter hoping to 

716 # collect everything in self._buffer, but that would 

717 # deadlock if the subprocess sends more than self.limit 

718 # bytes. So just call self.read(self._limit) until EOF. 

719 blocks = [] 

720 while True: 

721 block = await self.read(self._limit) 

722 if not block: 

723 break 

724 blocks.append(block) 

725 return b''.join(blocks) 

726 

727 if not self._buffer and not self._eof: 

728 await self._wait_for_data('read') 

729 

730 # This will work right even if buffer is less than n bytes 

731 data = bytes(memoryview(self._buffer)[:n]) 

732 del self._buffer[:n] 

733 

734 self._maybe_resume_transport() 

735 return data 

736 

737 async def readexactly(self, n): 

738 """Read exactly `n` bytes. 

739 

740 Raise an IncompleteReadError if EOF is reached before `n` bytes can be 

741 read. The IncompleteReadError.partial attribute of the exception will 

742 contain the partial read bytes. 

743 

744 if n is zero, return empty bytes object. 

745 

746 Returned value is not limited with limit, configured at stream 

747 creation. 

748 

749 If stream was paused, this function will automatically resume it if 

750 needed. 

751 """ 

752 if n < 0: 

753 raise ValueError('readexactly size can not be less than zero') 

754 

755 if self._exception is not None: 

756 raise self._exception 

757 

758 if n == 0: 

759 return b'' 

760 

761 while len(self._buffer) < n: 

762 if self._eof: 

763 incomplete = bytes(self._buffer) 

764 self._buffer.clear() 

765 raise exceptions.IncompleteReadError(incomplete, n) 

766 

767 await self._wait_for_data('readexactly') 

768 

769 if len(self._buffer) == n: 

770 data = bytes(self._buffer) 

771 self._buffer.clear() 

772 else: 

773 data = bytes(memoryview(self._buffer)[:n]) 

774 del self._buffer[:n] 

775 self._maybe_resume_transport() 

776 return data 

777 

778 def __aiter__(self): 

779 return self 

780 

781 async def __anext__(self): 

782 val = await self.readline() 

783 if val == b'': 

784 raise StopAsyncIteration 

785 return val