Coverage for Lib/asyncio/streams.py: 93%
412 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-26 01:27 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-26 01:27 +0000
1__all__ = (
2 'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
3 'open_connection', 'start_server')
5import collections
6import socket
7import sys
8import warnings
9import weakref
11if hasattr(socket, 'AF_UNIX'): 11 ↛ 14line 11 didn't jump to line 14 because the condition on line 11 was always true
12 __all__ += ('open_unix_connection', 'start_unix_server')
14from . import coroutines
15from . import events
16from . import exceptions
17from . import format_helpers
18from . import protocols
19from .log import logger
20from .tasks import sleep
23_DEFAULT_LIMIT = 2 ** 16 # 64 KiB
26async def open_connection(host=None, port=None, *,
27 limit=_DEFAULT_LIMIT, **kwds):
28 """A wrapper for create_connection() returning a (reader, writer) pair.
30 The reader returned is a StreamReader instance; the writer is a
31 StreamWriter instance.
33 The arguments are all the usual arguments to create_connection()
34 except protocol_factory; most common are positional host and port,
35 with various optional keyword arguments following.
37 Additional optional keyword arguments are loop (to set the event loop
38 instance to use) and limit (to set the buffer limit passed to the
39 StreamReader).
41 (If you want to customize the StreamReader and/or
42 StreamReaderProtocol classes, just copy the code -- there's
43 really nothing special here except some convenience.)
44 """
45 loop = events.get_running_loop()
46 reader = StreamReader(limit=limit, loop=loop)
47 protocol = StreamReaderProtocol(reader, loop=loop)
48 transport, _ = await loop.create_connection(
49 lambda: protocol, host, port, **kwds)
50 writer = StreamWriter(transport, protocol, reader, loop)
51 return reader, writer
54async def start_server(client_connected_cb, host=None, port=None, *,
55 limit=_DEFAULT_LIMIT, **kwds):
56 """Start a socket server, call back for each client connected.
58 The first parameter, `client_connected_cb`, takes two parameters:
59 client_reader, client_writer. client_reader is a StreamReader
60 object, while client_writer is a StreamWriter object. This
61 parameter can either be a plain callback function or a coroutine;
62 if it is a coroutine, it will be automatically converted into a
63 Task.
65 The rest of the arguments are all the usual arguments to
66 loop.create_server() except protocol_factory; most common are
67 positional host and port, with various optional keyword arguments
68 following. The return value is the same as loop.create_server().
70 Additional optional keyword argument is limit (to set the buffer
71 limit passed to the StreamReader).
73 The return value is the same as loop.create_server(), i.e. a
74 Server object which can be used to stop the service.
75 """
76 loop = events.get_running_loop()
78 def factory():
79 reader = StreamReader(limit=limit, loop=loop)
80 protocol = StreamReaderProtocol(reader, client_connected_cb,
81 loop=loop)
82 return protocol
84 return await loop.create_server(factory, host, port, **kwds)
87if hasattr(socket, 'AF_UNIX'): 87 ↛ 116line 87 didn't jump to line 116 because the condition on line 87 was always true
88 # UNIX Domain Sockets are supported on this platform
90 async def open_unix_connection(path=None, *,
91 limit=_DEFAULT_LIMIT, **kwds):
92 """Similar to `open_connection` but works with UNIX Domain Sockets."""
93 loop = events.get_running_loop()
95 reader = StreamReader(limit=limit, loop=loop)
96 protocol = StreamReaderProtocol(reader, loop=loop)
97 transport, _ = await loop.create_unix_connection(
98 lambda: protocol, path, **kwds)
99 writer = StreamWriter(transport, protocol, reader, loop)
100 return reader, writer
102 async def start_unix_server(client_connected_cb, path=None, *,
103 limit=_DEFAULT_LIMIT, **kwds):
104 """Similar to `start_server` but works with UNIX Domain Sockets."""
105 loop = events.get_running_loop()
107 def factory():
108 reader = StreamReader(limit=limit, loop=loop)
109 protocol = StreamReaderProtocol(reader, client_connected_cb,
110 loop=loop)
111 return protocol
113 return await loop.create_unix_server(factory, path, **kwds)
116class FlowControlMixin(protocols.Protocol):
117 """Reusable flow control logic for StreamWriter.drain().
119 This implements the protocol methods pause_writing(),
120 resume_writing() and connection_lost(). If the subclass overrides
121 these it must call the super methods.
123 StreamWriter.drain() must wait for _drain_helper() coroutine.
124 """
126 def __init__(self, loop=None):
127 if loop is None:
128 self._loop = events.get_event_loop()
129 else:
130 self._loop = loop
131 self._paused = False
132 self._drain_waiters = collections.deque()
133 self._connection_lost = False
135 def pause_writing(self):
136 assert not self._paused
137 self._paused = True
138 if self._loop.get_debug():
139 logger.debug("%r pauses writing", self)
141 def resume_writing(self):
142 assert self._paused
143 self._paused = False
144 if self._loop.get_debug(): 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true
145 logger.debug("%r resumes writing", self)
147 for waiter in self._drain_waiters:
148 if not waiter.done(): 148 ↛ 147line 148 didn't jump to line 147 because the condition on line 148 was always true
149 waiter.set_result(None)
151 def connection_lost(self, exc):
152 self._connection_lost = True
153 # Wake up the writer(s) if currently paused.
154 if not self._paused:
155 return
157 for waiter in self._drain_waiters:
158 if not waiter.done(): 158 ↛ 157line 158 didn't jump to line 157 because the condition on line 158 was always true
159 if exc is None: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true
160 waiter.set_result(None)
161 else:
162 waiter.set_exception(exc)
164 async def _drain_helper(self):
165 if self._connection_lost:
166 raise ConnectionResetError('Connection lost')
167 if not self._paused:
168 return
169 waiter = self._loop.create_future()
170 self._drain_waiters.append(waiter)
171 try:
172 await waiter
173 finally:
174 self._drain_waiters.remove(waiter)
176 def _get_close_waiter(self, stream):
177 raise NotImplementedError
180class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
181 """Helper class to adapt between Protocol and StreamReader.
183 (This is a helper class instead of making StreamReader itself a
184 Protocol subclass, because the StreamReader has other potential
185 uses, and to prevent the user of the StreamReader to accidentally
186 call inappropriate methods of the protocol.)
187 """
189 _source_traceback = None
191 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
192 super().__init__(loop=loop)
193 if stream_reader is not None: 193 ↛ 197line 193 didn't jump to line 197 because the condition on line 193 was always true
194 self._stream_reader_wr = weakref.ref(stream_reader)
195 self._source_traceback = stream_reader._source_traceback
196 else:
197 self._stream_reader_wr = None
198 if client_connected_cb is not None:
199 # This is a stream created by the `create_server()` function.
200 # Keep a strong reference to the reader until a connection
201 # is established.
202 self._strong_reader = stream_reader
203 self._reject_connection = False
204 self._task = None
205 self._transport = None
206 self._client_connected_cb = client_connected_cb
207 self._over_ssl = False
208 self._closed = self._loop.create_future()
210 @property
211 def _stream_reader(self):
212 if self._stream_reader_wr is None:
213 return None
214 return self._stream_reader_wr()
216 def _replace_transport(self, transport):
217 self._transport = transport
218 self._over_ssl = transport.get_extra_info('sslcontext') is not None
220 def connection_made(self, transport):
221 if self._reject_connection: 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true
222 context = {
223 'message': ('An open stream was garbage collected prior to '
224 'establishing network connection; '
225 'call "stream.close()" explicitly.')
226 }
227 if self._source_traceback:
228 context['source_traceback'] = self._source_traceback
229 self._loop.call_exception_handler(context)
230 transport.abort()
231 return
232 self._transport = transport
233 reader = self._stream_reader
234 if reader is not None: 234 ↛ 236line 234 didn't jump to line 236 because the condition on line 234 was always true
235 reader.set_transport(transport)
236 self._over_ssl = transport.get_extra_info('sslcontext') is not None
237 if self._client_connected_cb is not None:
238 writer = StreamWriter(transport, self, reader, self._loop)
239 res = self._client_connected_cb(reader, writer)
240 if coroutines.iscoroutine(res):
241 def callback(task):
242 if task.cancelled():
243 transport.close()
244 return
245 exc = task.exception()
246 if exc is not None:
247 self._loop.call_exception_handler({
248 'message': 'Unhandled exception in client_connected_cb',
249 'exception': exc,
250 'transport': transport,
251 })
252 transport.close()
254 self._task = self._loop.create_task(res)
255 self._task.add_done_callback(callback)
257 self._strong_reader = None
259 def connection_lost(self, exc):
260 reader = self._stream_reader
261 if reader is not None:
262 if exc is None:
263 reader.feed_eof()
264 else:
265 reader.set_exception(exc)
266 if not self._closed.done():
267 if exc is None:
268 self._closed.set_result(None)
269 else:
270 self._closed.set_exception(exc)
271 super().connection_lost(exc)
272 self._stream_reader_wr = None
273 self._task = None
274 self._transport = None
276 def data_received(self, data):
277 reader = self._stream_reader
278 if reader is not None: 278 ↛ exitline 278 didn't return from function 'data_received' because the condition on line 278 was always true
279 reader.feed_data(data)
281 def eof_received(self):
282 reader = self._stream_reader
283 if reader is not None: 283 ↛ 285line 283 didn't jump to line 285 because the condition on line 283 was always true
284 reader.feed_eof()
285 if self._over_ssl:
286 # Prevent a warning in SSLProtocol.eof_received:
287 # "returning true from eof_received()
288 # has no effect when using ssl"
289 return False
290 return True
292 def _get_close_waiter(self, stream):
293 return self._closed
295 def __del__(self):
296 # Prevent reports about unhandled exceptions.
297 # Better than self._closed._log_traceback = False hack
298 try:
299 closed = self._closed
300 except AttributeError:
301 pass # failed constructor
302 else:
303 if closed.done() and not closed.cancelled():
304 closed.exception()
307class StreamWriter:
308 """Wraps a Transport.
310 This exposes write(), writelines(), [can_]write_eof(),
311 get_extra_info() and close(). It adds drain() which returns an
312 optional Future on which you can wait for flow control. It also
313 adds a transport property which references the Transport
314 directly.
315 """
317 def __init__(self, transport, protocol, reader, loop):
318 self._transport = transport
319 self._protocol = protocol
320 # drain() expects that the reader has an exception() method
321 assert reader is None or isinstance(reader, StreamReader)
322 self._reader = reader
323 self._loop = loop
324 self._complete_fut = self._loop.create_future()
325 self._complete_fut.set_result(None)
327 def __repr__(self):
328 info = [self.__class__.__name__, f'transport={self._transport!r}']
329 if self._reader is not None: 329 ↛ 331line 329 didn't jump to line 331 because the condition on line 329 was always true
330 info.append(f'reader={self._reader!r}')
331 return '<{}>'.format(' '.join(info))
333 @property
334 def transport(self):
335 return self._transport
337 def write(self, data):
338 self._transport.write(data)
340 def writelines(self, data):
341 self._transport.writelines(data)
343 def write_eof(self):
344 return self._transport.write_eof()
346 def can_write_eof(self):
347 return self._transport.can_write_eof()
349 def close(self):
350 return self._transport.close()
352 def is_closing(self):
353 return self._transport.is_closing()
355 async def wait_closed(self):
356 await self._protocol._get_close_waiter(self)
358 def get_extra_info(self, name, default=None):
359 return self._transport.get_extra_info(name, default)
361 async def drain(self):
362 """Flush the write buffer.
364 The intended use is to write
366 w.write(data)
367 await w.drain()
368 """
369 if self._reader is not None:
370 exc = self._reader.exception()
371 if exc is not None: 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true
372 raise exc
373 if self._transport.is_closing():
374 # Wait for protocol.connection_lost() call
375 # Raise connection closing error if any,
376 # ConnectionResetError otherwise
377 # Yield to the event loop so connection_lost() may be
378 # called. Without this, _drain_helper() would return
379 # immediately, and code that calls
380 # write(...); await drain()
381 # in a loop would never call connection_lost(), so it
382 # would not see an error when the socket is closed.
383 await sleep(0)
384 await self._protocol._drain_helper()
386 async def start_tls(self, sslcontext, *,
387 server_hostname=None,
388 ssl_handshake_timeout=None,
389 ssl_shutdown_timeout=None):
390 """Upgrade an existing stream-based connection to TLS."""
391 server_side = self._protocol._client_connected_cb is not None
392 protocol = self._protocol
393 await self.drain()
394 new_transport = await self._loop.start_tls( # type: ignore
395 self._transport, protocol, sslcontext,
396 server_side=server_side, server_hostname=server_hostname,
397 ssl_handshake_timeout=ssl_handshake_timeout,
398 ssl_shutdown_timeout=ssl_shutdown_timeout)
399 self._transport = new_transport
400 protocol._replace_transport(new_transport)
402 def __del__(self, warnings=warnings):
403 if not self._transport.is_closing():
404 if self._loop.is_closed(): 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true
405 warnings.warn("loop is closed", ResourceWarning)
406 else:
407 self.close()
408 warnings.warn(f"unclosed {self!r}", ResourceWarning)
410class StreamReader:
412 _source_traceback = None
414 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
415 # The line length limit is a security feature;
416 # it also doubles as half the buffer limit.
418 if limit <= 0:
419 raise ValueError('Limit cannot be <= 0')
421 self._limit = limit
422 if loop is None:
423 self._loop = events.get_event_loop()
424 else:
425 self._loop = loop
426 self._buffer = bytearray()
427 self._eof = False # Whether we're done.
428 self._waiter = None # A future used by _wait_for_data()
429 self._exception = None
430 self._transport = None
431 self._paused = False
432 if self._loop.get_debug():
433 self._source_traceback = format_helpers.extract_stack(
434 sys._getframe(1))
436 def __repr__(self):
437 info = ['StreamReader']
438 if self._buffer:
439 info.append(f'{len(self._buffer)} bytes')
440 if self._eof:
441 info.append('eof')
442 if self._limit != _DEFAULT_LIMIT:
443 info.append(f'limit={self._limit}')
444 if self._waiter:
445 info.append(f'waiter={self._waiter!r}')
446 if self._exception:
447 info.append(f'exception={self._exception!r}')
448 if self._transport:
449 info.append(f'transport={self._transport!r}')
450 if self._paused: 450 ↛ 451line 450 didn't jump to line 451 because the condition on line 450 was never true
451 info.append('paused')
452 return '<{}>'.format(' '.join(info))
454 def exception(self):
455 return self._exception
457 def set_exception(self, exc):
458 self._exception = exc
460 waiter = self._waiter
461 if waiter is not None:
462 self._waiter = None
463 if not waiter.cancelled(): 463 ↛ exitline 463 didn't return from function 'set_exception' because the condition on line 463 was always true
464 waiter.set_exception(exc)
466 def _wakeup_waiter(self):
467 """Wakeup read*() functions waiting for data or EOF."""
468 waiter = self._waiter
469 if waiter is not None:
470 self._waiter = None
471 if not waiter.cancelled(): 471 ↛ exitline 471 didn't return from function '_wakeup_waiter' because the condition on line 471 was always true
472 waiter.set_result(None)
474 def set_transport(self, transport):
475 assert self._transport is None, 'Transport already set'
476 self._transport = transport
478 def _maybe_resume_transport(self):
479 if self._paused and len(self._buffer) <= self._limit:
480 self._paused = False
481 self._transport.resume_reading()
483 def feed_eof(self):
484 self._eof = True
485 self._wakeup_waiter()
487 def at_eof(self):
488 """Return True if the buffer is empty and 'feed_eof' was called."""
489 return self._eof and not self._buffer
491 def feed_data(self, data):
492 assert not self._eof, 'feed_data after feed_eof'
494 if not data:
495 return
497 self._buffer.extend(data)
498 self._wakeup_waiter()
500 if (self._transport is not None and
501 not self._paused and
502 len(self._buffer) > 2 * self._limit):
503 try:
504 self._transport.pause_reading()
505 except NotImplementedError:
506 # The transport can't be paused.
507 # We'll just have to buffer all data.
508 # Forget the transport so we don't keep trying.
509 self._transport = None
510 else:
511 self._paused = True
513 async def _wait_for_data(self, func_name):
514 """Wait until feed_data() or feed_eof() is called.
516 If stream was paused, automatically resume it.
517 """
518 # StreamReader uses a future to link the protocol feed_data() method
519 # to a read coroutine. Running two read coroutines at the same time
520 # would have an unexpected behaviour. It would not possible to know
521 # which coroutine would get the next data.
522 if self._waiter is not None: 522 ↛ 523line 522 didn't jump to line 523 because the condition on line 522 was never true
523 raise RuntimeError(
524 f'{func_name}() called while another coroutine is '
525 f'already waiting for incoming data')
527 assert not self._eof, '_wait_for_data after EOF'
529 # Waiting for data while paused will make deadlock, so prevent it.
530 # This is essential for readexactly(n) for case when n > self._limit.
531 if self._paused: 531 ↛ 532line 531 didn't jump to line 532 because the condition on line 531 was never true
532 self._paused = False
533 self._transport.resume_reading()
535 self._waiter = self._loop.create_future()
536 try:
537 await self._waiter
538 finally:
539 self._waiter = None
541 async def readline(self):
542 """Read chunk of data from the stream until newline (b'\n') is found.
544 On success, return chunk that ends with newline. If only partial
545 line can be read due to EOF, return incomplete line without
546 terminating newline. When EOF was reached while no bytes read, empty
547 bytes object is returned.
549 If limit is reached, ValueError will be raised. In that case, if
550 newline was found, complete line including newline will be removed
551 from internal buffer. Else, internal buffer will be cleared. Limit is
552 compared against part of the line without newline.
554 If stream was paused, this function will automatically resume it if
555 needed.
556 """
557 sep = b'\n'
558 seplen = len(sep)
559 try:
560 line = await self.readuntil(sep)
561 except exceptions.IncompleteReadError as e:
562 return e.partial
563 except exceptions.LimitOverrunError as e:
564 if self._buffer.startswith(sep, e.consumed):
565 del self._buffer[:e.consumed + seplen]
566 else:
567 self._buffer.clear()
568 self._maybe_resume_transport()
569 raise ValueError(e.args[0])
570 return line
572 async def readuntil(self, separator=b'\n'):
573 """Read data from the stream until ``separator`` is found.
575 On success, the data and separator will be removed from the
576 internal buffer (consumed). Returned data will include the
577 separator at the end.
579 Configured stream limit is used to check result. Limit sets the
580 maximal length of data that can be returned, not counting the
581 separator.
583 If an EOF occurs and the complete separator is still not found,
584 an IncompleteReadError exception will be raised, and the internal
585 buffer will be reset. The IncompleteReadError.partial attribute
586 may contain the separator partially.
588 If the data cannot be read because of over limit, a
589 LimitOverrunError exception will be raised, and the data
590 will be left in the internal buffer, so it can be read again.
592 The ``separator`` may also be a tuple of separators. In this
593 case the return value will be the shortest possible that has any
594 separator as the suffix. For the purposes of LimitOverrunError,
595 the shortest possible separator is considered to be the one that
596 matched.
597 """
598 if isinstance(separator, tuple):
599 # Makes sure shortest matches wins
600 separator = sorted(separator, key=len)
601 else:
602 separator = [separator]
603 if not separator:
604 raise ValueError('Separator should contain at least one element')
605 min_seplen = len(separator[0])
606 max_seplen = len(separator[-1])
607 if min_seplen == 0:
608 raise ValueError('Separator should be at least one-byte string')
610 if self._exception is not None:
611 raise self._exception
613 # Consume whole buffer except last bytes, which length is
614 # one less than max_seplen. Let's check corner cases with
615 # separator[-1]='SEPARATOR':
616 # * we have received almost complete separator (without last
617 # byte). i.e buffer='some textSEPARATO'. In this case we
618 # can safely consume max_seplen - 1 bytes.
619 # * last byte of buffer is first byte of separator, i.e.
620 # buffer='abcdefghijklmnopqrS'. We may safely consume
621 # everything except that last byte, but this require to
622 # analyze bytes of buffer that match partial separator.
623 # This is slow and/or require FSM. For this case our
624 # implementation is not optimal, since require rescanning
625 # of data that is known to not belong to separator. In
626 # real world, separator will not be so long to notice
627 # performance problems. Even when reading MIME-encoded
628 # messages :)
630 # `offset` is the number of bytes from the beginning of the buffer
631 # where there is no occurrence of any `separator`.
632 offset = 0
634 # Loop until we find a `separator` in the buffer, exceed the buffer size,
635 # or an EOF has happened.
636 while True:
637 buflen = len(self._buffer)
639 # Check if we now have enough data in the buffer for shortest
640 # separator to fit.
641 if buflen - offset >= min_seplen:
642 match_start = None
643 match_end = None
644 for sep in separator:
645 isep = self._buffer.find(sep, offset)
647 if isep != -1:
648 # `separator` is in the buffer. `match_start` and
649 # `match_end` will be used later to retrieve the
650 # data.
651 end = isep + len(sep)
652 if match_end is None or end < match_end:
653 match_end = end
654 match_start = isep
655 if match_end is not None:
656 break
658 # see upper comment for explanation.
659 offset = max(0, buflen + 1 - max_seplen)
660 if offset > self._limit:
661 raise exceptions.LimitOverrunError(
662 'Separator is not found, and chunk exceed the limit',
663 offset)
665 # Complete message (with full separator) may be present in buffer
666 # even when EOF flag is set. This may happen when the last chunk
667 # adds data which makes separator be found. That's why we check for
668 # EOF *after* inspecting the buffer.
669 if self._eof:
670 chunk = bytes(self._buffer)
671 self._buffer.clear()
672 raise exceptions.IncompleteReadError(chunk, None)
674 # _wait_for_data() will resume reading if stream was paused.
675 await self._wait_for_data('readuntil')
677 if match_start > self._limit:
678 raise exceptions.LimitOverrunError(
679 'Separator is found, but chunk is longer than limit', match_start)
681 chunk = self._buffer[:match_end]
682 del self._buffer[:match_end]
683 self._maybe_resume_transport()
684 return bytes(chunk)
686 async def read(self, n=-1):
687 """Read up to `n` bytes from the stream.
689 If `n` is not provided or set to -1,
690 read until EOF, then return all read bytes.
691 If EOF was received and the internal buffer is empty,
692 return an empty bytes object.
694 If `n` is 0, return an empty bytes object immediately.
696 If `n` is positive, return at most `n` available bytes
697 as soon as at least 1 byte is available in the internal buffer.
698 If EOF is received before any byte is read, return an empty
699 bytes object.
701 Returned value is not limited with limit, configured at stream
702 creation.
704 If stream was paused, this function will automatically resume it if
705 needed.
706 """
708 if self._exception is not None:
709 raise self._exception
711 if n == 0:
712 return b''
714 if n < 0:
715 # This used to just loop creating a new waiter hoping to
716 # collect everything in self._buffer, but that would
717 # deadlock if the subprocess sends more than self.limit
718 # bytes. So just call self.read(self._limit) until EOF.
719 blocks = []
720 while True:
721 block = await self.read(self._limit)
722 if not block:
723 break
724 blocks.append(block)
725 return b''.join(blocks)
727 if not self._buffer and not self._eof:
728 await self._wait_for_data('read')
730 # This will work right even if buffer is less than n bytes
731 data = bytes(memoryview(self._buffer)[:n])
732 del self._buffer[:n]
734 self._maybe_resume_transport()
735 return data
737 async def readexactly(self, n):
738 """Read exactly `n` bytes.
740 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
741 read. The IncompleteReadError.partial attribute of the exception will
742 contain the partial read bytes.
744 if n is zero, return empty bytes object.
746 Returned value is not limited with limit, configured at stream
747 creation.
749 If stream was paused, this function will automatically resume it if
750 needed.
751 """
752 if n < 0:
753 raise ValueError('readexactly size can not be less than zero')
755 if self._exception is not None:
756 raise self._exception
758 if n == 0:
759 return b''
761 while len(self._buffer) < n:
762 if self._eof:
763 incomplete = bytes(self._buffer)
764 self._buffer.clear()
765 raise exceptions.IncompleteReadError(incomplete, n)
767 await self._wait_for_data('readexactly')
769 if len(self._buffer) == n:
770 data = bytes(self._buffer)
771 self._buffer.clear()
772 else:
773 data = bytes(memoryview(self._buffer)[:n])
774 del self._buffer[:n]
775 self._maybe_resume_transport()
776 return data
778 def __aiter__(self):
779 return self
781 async def __anext__(self):
782 val = await self.readline()
783 if val == b'':
784 raise StopAsyncIteration
785 return val