Coverage for Lib/asyncio/streams.py: 93%

413 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-23 01:21 +0000

1__all__ = ( 

2 'StreamReader', 'StreamWriter', 'StreamReaderProtocol', 

3 'open_connection', 'start_server') 

4 

5import collections 

6import socket 

7import sys 

8import warnings 

9import weakref 

10 

11if hasattr(socket, 'AF_UNIX'): 11 ↛ 14line 11 didn't jump to line 14 because the condition on line 11 was always true

12 __all__ += ('open_unix_connection', 'start_unix_server') 

13 

14from . import coroutines 

15from . import events 

16from . import exceptions 

17from . import format_helpers 

18from . import protocols 

19from .log import logger 

20from .tasks import sleep 

21 

22 

23_DEFAULT_LIMIT = 2 ** 16 # 64 KiB 

24 

25 

26async def open_connection(host=None, port=None, *, 

27 limit=_DEFAULT_LIMIT, **kwds): 

28 """A wrapper for create_connection() returning a (reader, writer) pair. 

29 

30 The reader returned is a StreamReader instance; the writer is a 

31 StreamWriter instance. 

32 

33 The arguments are all the usual arguments to create_connection() 

34 except protocol_factory; most common are positional host and port, 

35 with various optional keyword arguments following. 

36 

37 Additional optional keyword arguments are loop (to set the event loop 

38 instance to use) and limit (to set the buffer limit passed to the 

39 StreamReader). 

40 

41 (If you want to customize the StreamReader and/or 

42 StreamReaderProtocol classes, just copy the code -- there's 

43 really nothing special here except some convenience.) 

44 """ 

45 loop = events.get_running_loop() 

46 reader = StreamReader(limit=limit, loop=loop) 

47 protocol = StreamReaderProtocol(reader, loop=loop) 

48 transport, _ = await loop.create_connection( 

49 lambda: protocol, host, port, **kwds) 

50 writer = StreamWriter(transport, protocol, reader, loop) 

51 return reader, writer 

52 

53 

54async def start_server(client_connected_cb, host=None, port=None, *, 

55 limit=_DEFAULT_LIMIT, **kwds): 

56 """Start a socket server, call back for each client connected. 

57 

58 The first parameter, `client_connected_cb`, takes two parameters: 

59 client_reader, client_writer. client_reader is a StreamReader 

60 object, while client_writer is a StreamWriter object. This 

61 parameter can either be a plain callback function or a coroutine; 

62 if it is a coroutine, it will be automatically converted into a 

63 Task. 

64 

65 The rest of the arguments are all the usual arguments to 

66 loop.create_server() except protocol_factory; most common are 

67 positional host and port, with various optional keyword arguments 

68 following. The return value is the same as loop.create_server(). 

69 

70 Additional optional keyword argument is limit (to set the buffer 

71 limit passed to the StreamReader). 

72 

73 The return value is the same as loop.create_server(), i.e. a 

74 Server object which can be used to stop the service. 

75 """ 

76 loop = events.get_running_loop() 

77 

78 def factory(): 

79 reader = StreamReader(limit=limit, loop=loop) 

80 protocol = StreamReaderProtocol(reader, client_connected_cb, 

81 loop=loop) 

82 return protocol 

83 

84 return await loop.create_server(factory, host, port, **kwds) 

85 

86 

87if hasattr(socket, 'AF_UNIX'): 87 ↛ 116line 87 didn't jump to line 116 because the condition on line 87 was always true

88 # UNIX Domain Sockets are supported on this platform 

89 

90 async def open_unix_connection(path=None, *, 

91 limit=_DEFAULT_LIMIT, **kwds): 

92 """Similar to `open_connection` but works with UNIX Domain Sockets.""" 

93 loop = events.get_running_loop() 

94 

95 reader = StreamReader(limit=limit, loop=loop) 

96 protocol = StreamReaderProtocol(reader, loop=loop) 

97 transport, _ = await loop.create_unix_connection( 

98 lambda: protocol, path, **kwds) 

99 writer = StreamWriter(transport, protocol, reader, loop) 

100 return reader, writer 

101 

102 async def start_unix_server(client_connected_cb, path=None, *, 

103 limit=_DEFAULT_LIMIT, **kwds): 

104 """Similar to `start_server` but works with UNIX Domain Sockets.""" 

105 loop = events.get_running_loop() 

106 

107 def factory(): 

108 reader = StreamReader(limit=limit, loop=loop) 

109 protocol = StreamReaderProtocol(reader, client_connected_cb, 

110 loop=loop) 

111 return protocol 

112 

113 return await loop.create_unix_server(factory, path, **kwds) 

114 

115 

116class FlowControlMixin(protocols.Protocol): 

117 """Reusable flow control logic for StreamWriter.drain(). 

118 

119 This implements the protocol methods pause_writing(), 

120 resume_writing() and connection_lost(). If the subclass overrides 

121 these it must call the super methods. 

122 

123 StreamWriter.drain() must wait for _drain_helper() coroutine. 

124 """ 

125 

126 def __init__(self, loop=None): 

127 if loop is None: 

128 self._loop = events.get_event_loop() 

129 else: 

130 self._loop = loop 

131 self._paused = False 

132 self._drain_waiters = collections.deque() 

133 self._connection_lost = False 

134 

135 def pause_writing(self): 

136 assert not self._paused 

137 self._paused = True 

138 if self._loop.get_debug(): 

139 logger.debug("%r pauses writing", self) 

140 

141 def resume_writing(self): 

142 assert self._paused 

143 self._paused = False 

144 if self._loop.get_debug(): 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true

145 logger.debug("%r resumes writing", self) 

146 

147 for waiter in self._drain_waiters: 

148 if not waiter.done(): 148 ↛ 147line 148 didn't jump to line 147 because the condition on line 148 was always true

149 waiter.set_result(None) 

150 

151 def connection_lost(self, exc): 

152 self._connection_lost = True 

153 # Wake up the writer(s) if currently paused. 

154 if not self._paused: 

155 return 

156 

157 for waiter in self._drain_waiters: 

158 if not waiter.done(): 158 ↛ 157line 158 didn't jump to line 157 because the condition on line 158 was always true

159 if exc is None: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true

160 waiter.set_result(None) 

161 else: 

162 waiter.set_exception(exc) 

163 

164 async def _drain_helper(self): 

165 if self._connection_lost: 

166 raise ConnectionResetError('Connection lost') 

167 if not self._paused: 

168 return 

169 waiter = self._loop.create_future() 

170 self._drain_waiters.append(waiter) 

171 try: 

172 await waiter 

173 finally: 

174 self._drain_waiters.remove(waiter) 

175 

176 def _get_close_waiter(self, stream): 

177 raise NotImplementedError 

178 

179 

180class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): 

181 """Helper class to adapt between Protocol and StreamReader. 

182 

183 (This is a helper class instead of making StreamReader itself a 

184 Protocol subclass, because the StreamReader has other potential 

185 uses, and to prevent the user of the StreamReader to accidentally 

186 call inappropriate methods of the protocol.) 

187 """ 

188 

189 _source_traceback = None 

190 

191 def __init__(self, stream_reader, client_connected_cb=None, loop=None): 

192 super().__init__(loop=loop) 

193 if stream_reader is not None: 193 ↛ 197line 193 didn't jump to line 197 because the condition on line 193 was always true

194 self._stream_reader_wr = weakref.ref(stream_reader) 

195 self._source_traceback = stream_reader._source_traceback 

196 else: 

197 self._stream_reader_wr = None 

198 if client_connected_cb is not None: 

199 # This is a stream created by the `create_server()` function. 

200 # Keep a strong reference to the reader until a connection 

201 # is established. 

202 self._strong_reader = stream_reader 

203 self._reject_connection = False 

204 self._task = None 

205 self._transport = None 

206 self._client_connected_cb = client_connected_cb 

207 self._over_ssl = False 

208 self._closed = self._loop.create_future() 

209 

210 @property 

211 def _stream_reader(self): 

212 if self._stream_reader_wr is None: 

213 return None 

214 return self._stream_reader_wr() 

215 

216 def _replace_transport(self, transport): 

217 loop = self._loop 

218 self._transport = transport 

219 self._over_ssl = transport.get_extra_info('sslcontext') is not None 

220 

221 def connection_made(self, transport): 

222 if self._reject_connection: 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true

223 context = { 

224 'message': ('An open stream was garbage collected prior to ' 

225 'establishing network connection; ' 

226 'call "stream.close()" explicitly.') 

227 } 

228 if self._source_traceback: 

229 context['source_traceback'] = self._source_traceback 

230 self._loop.call_exception_handler(context) 

231 transport.abort() 

232 return 

233 self._transport = transport 

234 reader = self._stream_reader 

235 if reader is not None: 235 ↛ 237line 235 didn't jump to line 237 because the condition on line 235 was always true

236 reader.set_transport(transport) 

237 self._over_ssl = transport.get_extra_info('sslcontext') is not None 

238 if self._client_connected_cb is not None: 

239 writer = StreamWriter(transport, self, reader, self._loop) 

240 res = self._client_connected_cb(reader, writer) 

241 if coroutines.iscoroutine(res): 

242 def callback(task): 

243 if task.cancelled(): 

244 transport.close() 

245 return 

246 exc = task.exception() 

247 if exc is not None: 

248 self._loop.call_exception_handler({ 

249 'message': 'Unhandled exception in client_connected_cb', 

250 'exception': exc, 

251 'transport': transport, 

252 }) 

253 transport.close() 

254 

255 self._task = self._loop.create_task(res) 

256 self._task.add_done_callback(callback) 

257 

258 self._strong_reader = None 

259 

260 def connection_lost(self, exc): 

261 reader = self._stream_reader 

262 if reader is not None: 

263 if exc is None: 

264 reader.feed_eof() 

265 else: 

266 reader.set_exception(exc) 

267 if not self._closed.done(): 

268 if exc is None: 

269 self._closed.set_result(None) 

270 else: 

271 self._closed.set_exception(exc) 

272 super().connection_lost(exc) 

273 self._stream_reader_wr = None 

274 self._task = None 

275 self._transport = None 

276 

277 def data_received(self, data): 

278 reader = self._stream_reader 

279 if reader is not None: 279 ↛ exitline 279 didn't return from function 'data_received' because the condition on line 279 was always true

280 reader.feed_data(data) 

281 

282 def eof_received(self): 

283 reader = self._stream_reader 

284 if reader is not None: 284 ↛ 286line 284 didn't jump to line 286 because the condition on line 284 was always true

285 reader.feed_eof() 

286 if self._over_ssl: 

287 # Prevent a warning in SSLProtocol.eof_received: 

288 # "returning true from eof_received() 

289 # has no effect when using ssl" 

290 return False 

291 return True 

292 

293 def _get_close_waiter(self, stream): 

294 return self._closed 

295 

296 def __del__(self): 

297 # Prevent reports about unhandled exceptions. 

298 # Better than self._closed._log_traceback = False hack 

299 try: 

300 closed = self._closed 

301 except AttributeError: 

302 pass # failed constructor 

303 else: 

304 if closed.done() and not closed.cancelled(): 

305 closed.exception() 

306 

307 

308class StreamWriter: 

309 """Wraps a Transport. 

310 

311 This exposes write(), writelines(), [can_]write_eof(), 

312 get_extra_info() and close(). It adds drain() which returns an 

313 optional Future on which you can wait for flow control. It also 

314 adds a transport property which references the Transport 

315 directly. 

316 """ 

317 

318 def __init__(self, transport, protocol, reader, loop): 

319 self._transport = transport 

320 self._protocol = protocol 

321 # drain() expects that the reader has an exception() method 

322 assert reader is None or isinstance(reader, StreamReader) 

323 self._reader = reader 

324 self._loop = loop 

325 self._complete_fut = self._loop.create_future() 

326 self._complete_fut.set_result(None) 

327 

328 def __repr__(self): 

329 info = [self.__class__.__name__, f'transport={self._transport!r}'] 

330 if self._reader is not None: 330 ↛ 332line 330 didn't jump to line 332 because the condition on line 330 was always true

331 info.append(f'reader={self._reader!r}') 

332 return '<{}>'.format(' '.join(info)) 

333 

334 @property 

335 def transport(self): 

336 return self._transport 

337 

338 def write(self, data): 

339 self._transport.write(data) 

340 

341 def writelines(self, data): 

342 self._transport.writelines(data) 

343 

344 def write_eof(self): 

345 return self._transport.write_eof() 

346 

347 def can_write_eof(self): 

348 return self._transport.can_write_eof() 

349 

350 def close(self): 

351 return self._transport.close() 

352 

353 def is_closing(self): 

354 return self._transport.is_closing() 

355 

356 async def wait_closed(self): 

357 await self._protocol._get_close_waiter(self) 

358 

359 def get_extra_info(self, name, default=None): 

360 return self._transport.get_extra_info(name, default) 

361 

362 async def drain(self): 

363 """Flush the write buffer. 

364 

365 The intended use is to write 

366 

367 w.write(data) 

368 await w.drain() 

369 """ 

370 if self._reader is not None: 

371 exc = self._reader.exception() 

372 if exc is not None: 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true

373 raise exc 

374 if self._transport.is_closing(): 

375 # Wait for protocol.connection_lost() call 

376 # Raise connection closing error if any, 

377 # ConnectionResetError otherwise 

378 # Yield to the event loop so connection_lost() may be 

379 # called. Without this, _drain_helper() would return 

380 # immediately, and code that calls 

381 # write(...); await drain() 

382 # in a loop would never call connection_lost(), so it 

383 # would not see an error when the socket is closed. 

384 await sleep(0) 

385 await self._protocol._drain_helper() 

386 

387 async def start_tls(self, sslcontext, *, 

388 server_hostname=None, 

389 ssl_handshake_timeout=None, 

390 ssl_shutdown_timeout=None): 

391 """Upgrade an existing stream-based connection to TLS.""" 

392 server_side = self._protocol._client_connected_cb is not None 

393 protocol = self._protocol 

394 await self.drain() 

395 new_transport = await self._loop.start_tls( # type: ignore 

396 self._transport, protocol, sslcontext, 

397 server_side=server_side, server_hostname=server_hostname, 

398 ssl_handshake_timeout=ssl_handshake_timeout, 

399 ssl_shutdown_timeout=ssl_shutdown_timeout) 

400 self._transport = new_transport 

401 protocol._replace_transport(new_transport) 

402 

403 def __del__(self, warnings=warnings): 

404 if not self._transport.is_closing(): 

405 if self._loop.is_closed(): 405 ↛ 406line 405 didn't jump to line 406 because the condition on line 405 was never true

406 warnings.warn("loop is closed", ResourceWarning) 

407 else: 

408 self.close() 

409 warnings.warn(f"unclosed {self!r}", ResourceWarning) 

410 

411class StreamReader: 

412 

413 _source_traceback = None 

414 

415 def __init__(self, limit=_DEFAULT_LIMIT, loop=None): 

416 # The line length limit is a security feature; 

417 # it also doubles as half the buffer limit. 

418 

419 if limit <= 0: 

420 raise ValueError('Limit cannot be <= 0') 

421 

422 self._limit = limit 

423 if loop is None: 

424 self._loop = events.get_event_loop() 

425 else: 

426 self._loop = loop 

427 self._buffer = bytearray() 

428 self._eof = False # Whether we're done. 

429 self._waiter = None # A future used by _wait_for_data() 

430 self._exception = None 

431 self._transport = None 

432 self._paused = False 

433 if self._loop.get_debug(): 

434 self._source_traceback = format_helpers.extract_stack( 

435 sys._getframe(1)) 

436 

437 def __repr__(self): 

438 info = ['StreamReader'] 

439 if self._buffer: 

440 info.append(f'{len(self._buffer)} bytes') 

441 if self._eof: 

442 info.append('eof') 

443 if self._limit != _DEFAULT_LIMIT: 

444 info.append(f'limit={self._limit}') 

445 if self._waiter: 

446 info.append(f'waiter={self._waiter!r}') 

447 if self._exception: 

448 info.append(f'exception={self._exception!r}') 

449 if self._transport: 

450 info.append(f'transport={self._transport!r}') 

451 if self._paused: 451 ↛ 452line 451 didn't jump to line 452 because the condition on line 451 was never true

452 info.append('paused') 

453 return '<{}>'.format(' '.join(info)) 

454 

455 def exception(self): 

456 return self._exception 

457 

458 def set_exception(self, exc): 

459 self._exception = exc 

460 

461 waiter = self._waiter 

462 if waiter is not None: 

463 self._waiter = None 

464 if not waiter.cancelled(): 464 ↛ exitline 464 didn't return from function 'set_exception' because the condition on line 464 was always true

465 waiter.set_exception(exc) 

466 

467 def _wakeup_waiter(self): 

468 """Wakeup read*() functions waiting for data or EOF.""" 

469 waiter = self._waiter 

470 if waiter is not None: 

471 self._waiter = None 

472 if not waiter.cancelled(): 472 ↛ exitline 472 didn't return from function '_wakeup_waiter' because the condition on line 472 was always true

473 waiter.set_result(None) 

474 

475 def set_transport(self, transport): 

476 assert self._transport is None, 'Transport already set' 

477 self._transport = transport 

478 

479 def _maybe_resume_transport(self): 

480 if self._paused and len(self._buffer) <= self._limit: 

481 self._paused = False 

482 self._transport.resume_reading() 

483 

484 def feed_eof(self): 

485 self._eof = True 

486 self._wakeup_waiter() 

487 

488 def at_eof(self): 

489 """Return True if the buffer is empty and 'feed_eof' was called.""" 

490 return self._eof and not self._buffer 

491 

492 def feed_data(self, data): 

493 assert not self._eof, 'feed_data after feed_eof' 

494 

495 if not data: 

496 return 

497 

498 self._buffer.extend(data) 

499 self._wakeup_waiter() 

500 

501 if (self._transport is not None and 

502 not self._paused and 

503 len(self._buffer) > 2 * self._limit): 

504 try: 

505 self._transport.pause_reading() 

506 except NotImplementedError: 

507 # The transport can't be paused. 

508 # We'll just have to buffer all data. 

509 # Forget the transport so we don't keep trying. 

510 self._transport = None 

511 else: 

512 self._paused = True 

513 

514 async def _wait_for_data(self, func_name): 

515 """Wait until feed_data() or feed_eof() is called. 

516 

517 If stream was paused, automatically resume it. 

518 """ 

519 # StreamReader uses a future to link the protocol feed_data() method 

520 # to a read coroutine. Running two read coroutines at the same time 

521 # would have an unexpected behaviour. It would not possible to know 

522 # which coroutine would get the next data. 

523 if self._waiter is not None: 523 ↛ 524line 523 didn't jump to line 524 because the condition on line 523 was never true

524 raise RuntimeError( 

525 f'{func_name}() called while another coroutine is ' 

526 f'already waiting for incoming data') 

527 

528 assert not self._eof, '_wait_for_data after EOF' 

529 

530 # Waiting for data while paused will make deadlock, so prevent it. 

531 # This is essential for readexactly(n) for case when n > self._limit. 

532 if self._paused: 532 ↛ 533line 532 didn't jump to line 533 because the condition on line 532 was never true

533 self._paused = False 

534 self._transport.resume_reading() 

535 

536 self._waiter = self._loop.create_future() 

537 try: 

538 await self._waiter 

539 finally: 

540 self._waiter = None 

541 

542 async def readline(self): 

543 """Read chunk of data from the stream until newline (b'\n') is found. 

544 

545 On success, return chunk that ends with newline. If only partial 

546 line can be read due to EOF, return incomplete line without 

547 terminating newline. When EOF was reached while no bytes read, empty 

548 bytes object is returned. 

549 

550 If limit is reached, ValueError will be raised. In that case, if 

551 newline was found, complete line including newline will be removed 

552 from internal buffer. Else, internal buffer will be cleared. Limit is 

553 compared against part of the line without newline. 

554 

555 If stream was paused, this function will automatically resume it if 

556 needed. 

557 """ 

558 sep = b'\n' 

559 seplen = len(sep) 

560 try: 

561 line = await self.readuntil(sep) 

562 except exceptions.IncompleteReadError as e: 

563 return e.partial 

564 except exceptions.LimitOverrunError as e: 

565 if self._buffer.startswith(sep, e.consumed): 

566 del self._buffer[:e.consumed + seplen] 

567 else: 

568 self._buffer.clear() 

569 self._maybe_resume_transport() 

570 raise ValueError(e.args[0]) 

571 return line 

572 

573 async def readuntil(self, separator=b'\n'): 

574 """Read data from the stream until ``separator`` is found. 

575 

576 On success, the data and separator will be removed from the 

577 internal buffer (consumed). Returned data will include the 

578 separator at the end. 

579 

580 Configured stream limit is used to check result. Limit sets the 

581 maximal length of data that can be returned, not counting the 

582 separator. 

583 

584 If an EOF occurs and the complete separator is still not found, 

585 an IncompleteReadError exception will be raised, and the internal 

586 buffer will be reset. The IncompleteReadError.partial attribute 

587 may contain the separator partially. 

588 

589 If the data cannot be read because of over limit, a 

590 LimitOverrunError exception will be raised, and the data 

591 will be left in the internal buffer, so it can be read again. 

592 

593 The ``separator`` may also be a tuple of separators. In this 

594 case the return value will be the shortest possible that has any 

595 separator as the suffix. For the purposes of LimitOverrunError, 

596 the shortest possible separator is considered to be the one that 

597 matched. 

598 """ 

599 if isinstance(separator, tuple): 

600 # Makes sure shortest matches wins 

601 separator = sorted(separator, key=len) 

602 else: 

603 separator = [separator] 

604 if not separator: 

605 raise ValueError('Separator should contain at least one element') 

606 min_seplen = len(separator[0]) 

607 max_seplen = len(separator[-1]) 

608 if min_seplen == 0: 

609 raise ValueError('Separator should be at least one-byte string') 

610 

611 if self._exception is not None: 

612 raise self._exception 

613 

614 # Consume whole buffer except last bytes, which length is 

615 # one less than max_seplen. Let's check corner cases with 

616 # separator[-1]='SEPARATOR': 

617 # * we have received almost complete separator (without last 

618 # byte). i.e buffer='some textSEPARATO'. In this case we 

619 # can safely consume max_seplen - 1 bytes. 

620 # * last byte of buffer is first byte of separator, i.e. 

621 # buffer='abcdefghijklmnopqrS'. We may safely consume 

622 # everything except that last byte, but this require to 

623 # analyze bytes of buffer that match partial separator. 

624 # This is slow and/or require FSM. For this case our 

625 # implementation is not optimal, since require rescanning 

626 # of data that is known to not belong to separator. In 

627 # real world, separator will not be so long to notice 

628 # performance problems. Even when reading MIME-encoded 

629 # messages :) 

630 

631 # `offset` is the number of bytes from the beginning of the buffer 

632 # where there is no occurrence of any `separator`. 

633 offset = 0 

634 

635 # Loop until we find a `separator` in the buffer, exceed the buffer size, 

636 # or an EOF has happened. 

637 while True: 

638 buflen = len(self._buffer) 

639 

640 # Check if we now have enough data in the buffer for shortest 

641 # separator to fit. 

642 if buflen - offset >= min_seplen: 

643 match_start = None 

644 match_end = None 

645 for sep in separator: 

646 isep = self._buffer.find(sep, offset) 

647 

648 if isep != -1: 

649 # `separator` is in the buffer. `match_start` and 

650 # `match_end` will be used later to retrieve the 

651 # data. 

652 end = isep + len(sep) 

653 if match_end is None or end < match_end: 

654 match_end = end 

655 match_start = isep 

656 if match_end is not None: 

657 break 

658 

659 # see upper comment for explanation. 

660 offset = max(0, buflen + 1 - max_seplen) 

661 if offset > self._limit: 

662 raise exceptions.LimitOverrunError( 

663 'Separator is not found, and chunk exceed the limit', 

664 offset) 

665 

666 # Complete message (with full separator) may be present in buffer 

667 # even when EOF flag is set. This may happen when the last chunk 

668 # adds data which makes separator be found. That's why we check for 

669 # EOF *after* inspecting the buffer. 

670 if self._eof: 

671 chunk = bytes(self._buffer) 

672 self._buffer.clear() 

673 raise exceptions.IncompleteReadError(chunk, None) 

674 

675 # _wait_for_data() will resume reading if stream was paused. 

676 await self._wait_for_data('readuntil') 

677 

678 if match_start > self._limit: 

679 raise exceptions.LimitOverrunError( 

680 'Separator is found, but chunk is longer than limit', match_start) 

681 

682 chunk = self._buffer[:match_end] 

683 del self._buffer[:match_end] 

684 self._maybe_resume_transport() 

685 return bytes(chunk) 

686 

687 async def read(self, n=-1): 

688 """Read up to `n` bytes from the stream. 

689 

690 If `n` is not provided or set to -1, 

691 read until EOF, then return all read bytes. 

692 If EOF was received and the internal buffer is empty, 

693 return an empty bytes object. 

694 

695 If `n` is 0, return an empty bytes object immediately. 

696 

697 If `n` is positive, return at most `n` available bytes 

698 as soon as at least 1 byte is available in the internal buffer. 

699 If EOF is received before any byte is read, return an empty 

700 bytes object. 

701 

702 Returned value is not limited with limit, configured at stream 

703 creation. 

704 

705 If stream was paused, this function will automatically resume it if 

706 needed. 

707 """ 

708 

709 if self._exception is not None: 

710 raise self._exception 

711 

712 if n == 0: 

713 return b'' 

714 

715 if n < 0: 

716 # This used to just loop creating a new waiter hoping to 

717 # collect everything in self._buffer, but that would 

718 # deadlock if the subprocess sends more than self.limit 

719 # bytes. So just call self.read(self._limit) until EOF. 

720 blocks = [] 

721 while True: 

722 block = await self.read(self._limit) 

723 if not block: 

724 break 

725 blocks.append(block) 

726 return b''.join(blocks) 

727 

728 if not self._buffer and not self._eof: 

729 await self._wait_for_data('read') 

730 

731 # This will work right even if buffer is less than n bytes 

732 data = bytes(memoryview(self._buffer)[:n]) 

733 del self._buffer[:n] 

734 

735 self._maybe_resume_transport() 

736 return data 

737 

738 async def readexactly(self, n): 

739 """Read exactly `n` bytes. 

740 

741 Raise an IncompleteReadError if EOF is reached before `n` bytes can be 

742 read. The IncompleteReadError.partial attribute of the exception will 

743 contain the partial read bytes. 

744 

745 if n is zero, return empty bytes object. 

746 

747 Returned value is not limited with limit, configured at stream 

748 creation. 

749 

750 If stream was paused, this function will automatically resume it if 

751 needed. 

752 """ 

753 if n < 0: 

754 raise ValueError('readexactly size can not be less than zero') 

755 

756 if self._exception is not None: 

757 raise self._exception 

758 

759 if n == 0: 

760 return b'' 

761 

762 while len(self._buffer) < n: 

763 if self._eof: 

764 incomplete = bytes(self._buffer) 

765 self._buffer.clear() 

766 raise exceptions.IncompleteReadError(incomplete, n) 

767 

768 await self._wait_for_data('readexactly') 

769 

770 if len(self._buffer) == n: 

771 data = bytes(self._buffer) 

772 self._buffer.clear() 

773 else: 

774 data = bytes(memoryview(self._buffer)[:n]) 

775 del self._buffer[:n] 

776 self._maybe_resume_transport() 

777 return data 

778 

779 def __aiter__(self): 

780 return self 

781 

782 async def __anext__(self): 

783 val = await self.readline() 

784 if val == b'': 

785 raise StopAsyncIteration 

786 return val