Coverage for Lib/asyncio/streams.py: 93%
413 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-23 01:21 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-23 01:21 +0000
1__all__ = (
2 'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
3 'open_connection', 'start_server')
5import collections
6import socket
7import sys
8import warnings
9import weakref
11if hasattr(socket, 'AF_UNIX'): 11 ↛ 14line 11 didn't jump to line 14 because the condition on line 11 was always true
12 __all__ += ('open_unix_connection', 'start_unix_server')
14from . import coroutines
15from . import events
16from . import exceptions
17from . import format_helpers
18from . import protocols
19from .log import logger
20from .tasks import sleep
23_DEFAULT_LIMIT = 2 ** 16 # 64 KiB
26async def open_connection(host=None, port=None, *,
27 limit=_DEFAULT_LIMIT, **kwds):
28 """A wrapper for create_connection() returning a (reader, writer) pair.
30 The reader returned is a StreamReader instance; the writer is a
31 StreamWriter instance.
33 The arguments are all the usual arguments to create_connection()
34 except protocol_factory; most common are positional host and port,
35 with various optional keyword arguments following.
37 Additional optional keyword arguments are loop (to set the event loop
38 instance to use) and limit (to set the buffer limit passed to the
39 StreamReader).
41 (If you want to customize the StreamReader and/or
42 StreamReaderProtocol classes, just copy the code -- there's
43 really nothing special here except some convenience.)
44 """
45 loop = events.get_running_loop()
46 reader = StreamReader(limit=limit, loop=loop)
47 protocol = StreamReaderProtocol(reader, loop=loop)
48 transport, _ = await loop.create_connection(
49 lambda: protocol, host, port, **kwds)
50 writer = StreamWriter(transport, protocol, reader, loop)
51 return reader, writer
54async def start_server(client_connected_cb, host=None, port=None, *,
55 limit=_DEFAULT_LIMIT, **kwds):
56 """Start a socket server, call back for each client connected.
58 The first parameter, `client_connected_cb`, takes two parameters:
59 client_reader, client_writer. client_reader is a StreamReader
60 object, while client_writer is a StreamWriter object. This
61 parameter can either be a plain callback function or a coroutine;
62 if it is a coroutine, it will be automatically converted into a
63 Task.
65 The rest of the arguments are all the usual arguments to
66 loop.create_server() except protocol_factory; most common are
67 positional host and port, with various optional keyword arguments
68 following. The return value is the same as loop.create_server().
70 Additional optional keyword argument is limit (to set the buffer
71 limit passed to the StreamReader).
73 The return value is the same as loop.create_server(), i.e. a
74 Server object which can be used to stop the service.
75 """
76 loop = events.get_running_loop()
78 def factory():
79 reader = StreamReader(limit=limit, loop=loop)
80 protocol = StreamReaderProtocol(reader, client_connected_cb,
81 loop=loop)
82 return protocol
84 return await loop.create_server(factory, host, port, **kwds)
87if hasattr(socket, 'AF_UNIX'): 87 ↛ 116line 87 didn't jump to line 116 because the condition on line 87 was always true
88 # UNIX Domain Sockets are supported on this platform
90 async def open_unix_connection(path=None, *,
91 limit=_DEFAULT_LIMIT, **kwds):
92 """Similar to `open_connection` but works with UNIX Domain Sockets."""
93 loop = events.get_running_loop()
95 reader = StreamReader(limit=limit, loop=loop)
96 protocol = StreamReaderProtocol(reader, loop=loop)
97 transport, _ = await loop.create_unix_connection(
98 lambda: protocol, path, **kwds)
99 writer = StreamWriter(transport, protocol, reader, loop)
100 return reader, writer
102 async def start_unix_server(client_connected_cb, path=None, *,
103 limit=_DEFAULT_LIMIT, **kwds):
104 """Similar to `start_server` but works with UNIX Domain Sockets."""
105 loop = events.get_running_loop()
107 def factory():
108 reader = StreamReader(limit=limit, loop=loop)
109 protocol = StreamReaderProtocol(reader, client_connected_cb,
110 loop=loop)
111 return protocol
113 return await loop.create_unix_server(factory, path, **kwds)
116class FlowControlMixin(protocols.Protocol):
117 """Reusable flow control logic for StreamWriter.drain().
119 This implements the protocol methods pause_writing(),
120 resume_writing() and connection_lost(). If the subclass overrides
121 these it must call the super methods.
123 StreamWriter.drain() must wait for _drain_helper() coroutine.
124 """
126 def __init__(self, loop=None):
127 if loop is None:
128 self._loop = events.get_event_loop()
129 else:
130 self._loop = loop
131 self._paused = False
132 self._drain_waiters = collections.deque()
133 self._connection_lost = False
135 def pause_writing(self):
136 assert not self._paused
137 self._paused = True
138 if self._loop.get_debug():
139 logger.debug("%r pauses writing", self)
141 def resume_writing(self):
142 assert self._paused
143 self._paused = False
144 if self._loop.get_debug(): 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true
145 logger.debug("%r resumes writing", self)
147 for waiter in self._drain_waiters:
148 if not waiter.done(): 148 ↛ 147line 148 didn't jump to line 147 because the condition on line 148 was always true
149 waiter.set_result(None)
151 def connection_lost(self, exc):
152 self._connection_lost = True
153 # Wake up the writer(s) if currently paused.
154 if not self._paused:
155 return
157 for waiter in self._drain_waiters:
158 if not waiter.done(): 158 ↛ 157line 158 didn't jump to line 157 because the condition on line 158 was always true
159 if exc is None: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true
160 waiter.set_result(None)
161 else:
162 waiter.set_exception(exc)
164 async def _drain_helper(self):
165 if self._connection_lost:
166 raise ConnectionResetError('Connection lost')
167 if not self._paused:
168 return
169 waiter = self._loop.create_future()
170 self._drain_waiters.append(waiter)
171 try:
172 await waiter
173 finally:
174 self._drain_waiters.remove(waiter)
176 def _get_close_waiter(self, stream):
177 raise NotImplementedError
180class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
181 """Helper class to adapt between Protocol and StreamReader.
183 (This is a helper class instead of making StreamReader itself a
184 Protocol subclass, because the StreamReader has other potential
185 uses, and to prevent the user of the StreamReader to accidentally
186 call inappropriate methods of the protocol.)
187 """
189 _source_traceback = None
191 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
192 super().__init__(loop=loop)
193 if stream_reader is not None: 193 ↛ 197line 193 didn't jump to line 197 because the condition on line 193 was always true
194 self._stream_reader_wr = weakref.ref(stream_reader)
195 self._source_traceback = stream_reader._source_traceback
196 else:
197 self._stream_reader_wr = None
198 if client_connected_cb is not None:
199 # This is a stream created by the `create_server()` function.
200 # Keep a strong reference to the reader until a connection
201 # is established.
202 self._strong_reader = stream_reader
203 self._reject_connection = False
204 self._task = None
205 self._transport = None
206 self._client_connected_cb = client_connected_cb
207 self._over_ssl = False
208 self._closed = self._loop.create_future()
210 @property
211 def _stream_reader(self):
212 if self._stream_reader_wr is None:
213 return None
214 return self._stream_reader_wr()
216 def _replace_transport(self, transport):
217 loop = self._loop
218 self._transport = transport
219 self._over_ssl = transport.get_extra_info('sslcontext') is not None
221 def connection_made(self, transport):
222 if self._reject_connection: 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true
223 context = {
224 'message': ('An open stream was garbage collected prior to '
225 'establishing network connection; '
226 'call "stream.close()" explicitly.')
227 }
228 if self._source_traceback:
229 context['source_traceback'] = self._source_traceback
230 self._loop.call_exception_handler(context)
231 transport.abort()
232 return
233 self._transport = transport
234 reader = self._stream_reader
235 if reader is not None: 235 ↛ 237line 235 didn't jump to line 237 because the condition on line 235 was always true
236 reader.set_transport(transport)
237 self._over_ssl = transport.get_extra_info('sslcontext') is not None
238 if self._client_connected_cb is not None:
239 writer = StreamWriter(transport, self, reader, self._loop)
240 res = self._client_connected_cb(reader, writer)
241 if coroutines.iscoroutine(res):
242 def callback(task):
243 if task.cancelled():
244 transport.close()
245 return
246 exc = task.exception()
247 if exc is not None:
248 self._loop.call_exception_handler({
249 'message': 'Unhandled exception in client_connected_cb',
250 'exception': exc,
251 'transport': transport,
252 })
253 transport.close()
255 self._task = self._loop.create_task(res)
256 self._task.add_done_callback(callback)
258 self._strong_reader = None
260 def connection_lost(self, exc):
261 reader = self._stream_reader
262 if reader is not None:
263 if exc is None:
264 reader.feed_eof()
265 else:
266 reader.set_exception(exc)
267 if not self._closed.done():
268 if exc is None:
269 self._closed.set_result(None)
270 else:
271 self._closed.set_exception(exc)
272 super().connection_lost(exc)
273 self._stream_reader_wr = None
274 self._task = None
275 self._transport = None
277 def data_received(self, data):
278 reader = self._stream_reader
279 if reader is not None: 279 ↛ exitline 279 didn't return from function 'data_received' because the condition on line 279 was always true
280 reader.feed_data(data)
282 def eof_received(self):
283 reader = self._stream_reader
284 if reader is not None: 284 ↛ 286line 284 didn't jump to line 286 because the condition on line 284 was always true
285 reader.feed_eof()
286 if self._over_ssl:
287 # Prevent a warning in SSLProtocol.eof_received:
288 # "returning true from eof_received()
289 # has no effect when using ssl"
290 return False
291 return True
293 def _get_close_waiter(self, stream):
294 return self._closed
296 def __del__(self):
297 # Prevent reports about unhandled exceptions.
298 # Better than self._closed._log_traceback = False hack
299 try:
300 closed = self._closed
301 except AttributeError:
302 pass # failed constructor
303 else:
304 if closed.done() and not closed.cancelled():
305 closed.exception()
308class StreamWriter:
309 """Wraps a Transport.
311 This exposes write(), writelines(), [can_]write_eof(),
312 get_extra_info() and close(). It adds drain() which returns an
313 optional Future on which you can wait for flow control. It also
314 adds a transport property which references the Transport
315 directly.
316 """
318 def __init__(self, transport, protocol, reader, loop):
319 self._transport = transport
320 self._protocol = protocol
321 # drain() expects that the reader has an exception() method
322 assert reader is None or isinstance(reader, StreamReader)
323 self._reader = reader
324 self._loop = loop
325 self._complete_fut = self._loop.create_future()
326 self._complete_fut.set_result(None)
328 def __repr__(self):
329 info = [self.__class__.__name__, f'transport={self._transport!r}']
330 if self._reader is not None: 330 ↛ 332line 330 didn't jump to line 332 because the condition on line 330 was always true
331 info.append(f'reader={self._reader!r}')
332 return '<{}>'.format(' '.join(info))
334 @property
335 def transport(self):
336 return self._transport
338 def write(self, data):
339 self._transport.write(data)
341 def writelines(self, data):
342 self._transport.writelines(data)
344 def write_eof(self):
345 return self._transport.write_eof()
347 def can_write_eof(self):
348 return self._transport.can_write_eof()
350 def close(self):
351 return self._transport.close()
353 def is_closing(self):
354 return self._transport.is_closing()
356 async def wait_closed(self):
357 await self._protocol._get_close_waiter(self)
359 def get_extra_info(self, name, default=None):
360 return self._transport.get_extra_info(name, default)
362 async def drain(self):
363 """Flush the write buffer.
365 The intended use is to write
367 w.write(data)
368 await w.drain()
369 """
370 if self._reader is not None:
371 exc = self._reader.exception()
372 if exc is not None: 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true
373 raise exc
374 if self._transport.is_closing():
375 # Wait for protocol.connection_lost() call
376 # Raise connection closing error if any,
377 # ConnectionResetError otherwise
378 # Yield to the event loop so connection_lost() may be
379 # called. Without this, _drain_helper() would return
380 # immediately, and code that calls
381 # write(...); await drain()
382 # in a loop would never call connection_lost(), so it
383 # would not see an error when the socket is closed.
384 await sleep(0)
385 await self._protocol._drain_helper()
387 async def start_tls(self, sslcontext, *,
388 server_hostname=None,
389 ssl_handshake_timeout=None,
390 ssl_shutdown_timeout=None):
391 """Upgrade an existing stream-based connection to TLS."""
392 server_side = self._protocol._client_connected_cb is not None
393 protocol = self._protocol
394 await self.drain()
395 new_transport = await self._loop.start_tls( # type: ignore
396 self._transport, protocol, sslcontext,
397 server_side=server_side, server_hostname=server_hostname,
398 ssl_handshake_timeout=ssl_handshake_timeout,
399 ssl_shutdown_timeout=ssl_shutdown_timeout)
400 self._transport = new_transport
401 protocol._replace_transport(new_transport)
403 def __del__(self, warnings=warnings):
404 if not self._transport.is_closing():
405 if self._loop.is_closed(): 405 ↛ 406line 405 didn't jump to line 406 because the condition on line 405 was never true
406 warnings.warn("loop is closed", ResourceWarning)
407 else:
408 self.close()
409 warnings.warn(f"unclosed {self!r}", ResourceWarning)
411class StreamReader:
413 _source_traceback = None
415 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
416 # The line length limit is a security feature;
417 # it also doubles as half the buffer limit.
419 if limit <= 0:
420 raise ValueError('Limit cannot be <= 0')
422 self._limit = limit
423 if loop is None:
424 self._loop = events.get_event_loop()
425 else:
426 self._loop = loop
427 self._buffer = bytearray()
428 self._eof = False # Whether we're done.
429 self._waiter = None # A future used by _wait_for_data()
430 self._exception = None
431 self._transport = None
432 self._paused = False
433 if self._loop.get_debug():
434 self._source_traceback = format_helpers.extract_stack(
435 sys._getframe(1))
437 def __repr__(self):
438 info = ['StreamReader']
439 if self._buffer:
440 info.append(f'{len(self._buffer)} bytes')
441 if self._eof:
442 info.append('eof')
443 if self._limit != _DEFAULT_LIMIT:
444 info.append(f'limit={self._limit}')
445 if self._waiter:
446 info.append(f'waiter={self._waiter!r}')
447 if self._exception:
448 info.append(f'exception={self._exception!r}')
449 if self._transport:
450 info.append(f'transport={self._transport!r}')
451 if self._paused: 451 ↛ 452line 451 didn't jump to line 452 because the condition on line 451 was never true
452 info.append('paused')
453 return '<{}>'.format(' '.join(info))
455 def exception(self):
456 return self._exception
458 def set_exception(self, exc):
459 self._exception = exc
461 waiter = self._waiter
462 if waiter is not None:
463 self._waiter = None
464 if not waiter.cancelled(): 464 ↛ exitline 464 didn't return from function 'set_exception' because the condition on line 464 was always true
465 waiter.set_exception(exc)
467 def _wakeup_waiter(self):
468 """Wakeup read*() functions waiting for data or EOF."""
469 waiter = self._waiter
470 if waiter is not None:
471 self._waiter = None
472 if not waiter.cancelled(): 472 ↛ exitline 472 didn't return from function '_wakeup_waiter' because the condition on line 472 was always true
473 waiter.set_result(None)
475 def set_transport(self, transport):
476 assert self._transport is None, 'Transport already set'
477 self._transport = transport
479 def _maybe_resume_transport(self):
480 if self._paused and len(self._buffer) <= self._limit:
481 self._paused = False
482 self._transport.resume_reading()
484 def feed_eof(self):
485 self._eof = True
486 self._wakeup_waiter()
488 def at_eof(self):
489 """Return True if the buffer is empty and 'feed_eof' was called."""
490 return self._eof and not self._buffer
492 def feed_data(self, data):
493 assert not self._eof, 'feed_data after feed_eof'
495 if not data:
496 return
498 self._buffer.extend(data)
499 self._wakeup_waiter()
501 if (self._transport is not None and
502 not self._paused and
503 len(self._buffer) > 2 * self._limit):
504 try:
505 self._transport.pause_reading()
506 except NotImplementedError:
507 # The transport can't be paused.
508 # We'll just have to buffer all data.
509 # Forget the transport so we don't keep trying.
510 self._transport = None
511 else:
512 self._paused = True
514 async def _wait_for_data(self, func_name):
515 """Wait until feed_data() or feed_eof() is called.
517 If stream was paused, automatically resume it.
518 """
519 # StreamReader uses a future to link the protocol feed_data() method
520 # to a read coroutine. Running two read coroutines at the same time
521 # would have an unexpected behaviour. It would not possible to know
522 # which coroutine would get the next data.
523 if self._waiter is not None: 523 ↛ 524line 523 didn't jump to line 524 because the condition on line 523 was never true
524 raise RuntimeError(
525 f'{func_name}() called while another coroutine is '
526 f'already waiting for incoming data')
528 assert not self._eof, '_wait_for_data after EOF'
530 # Waiting for data while paused will make deadlock, so prevent it.
531 # This is essential for readexactly(n) for case when n > self._limit.
532 if self._paused: 532 ↛ 533line 532 didn't jump to line 533 because the condition on line 532 was never true
533 self._paused = False
534 self._transport.resume_reading()
536 self._waiter = self._loop.create_future()
537 try:
538 await self._waiter
539 finally:
540 self._waiter = None
542 async def readline(self):
543 """Read chunk of data from the stream until newline (b'\n') is found.
545 On success, return chunk that ends with newline. If only partial
546 line can be read due to EOF, return incomplete line without
547 terminating newline. When EOF was reached while no bytes read, empty
548 bytes object is returned.
550 If limit is reached, ValueError will be raised. In that case, if
551 newline was found, complete line including newline will be removed
552 from internal buffer. Else, internal buffer will be cleared. Limit is
553 compared against part of the line without newline.
555 If stream was paused, this function will automatically resume it if
556 needed.
557 """
558 sep = b'\n'
559 seplen = len(sep)
560 try:
561 line = await self.readuntil(sep)
562 except exceptions.IncompleteReadError as e:
563 return e.partial
564 except exceptions.LimitOverrunError as e:
565 if self._buffer.startswith(sep, e.consumed):
566 del self._buffer[:e.consumed + seplen]
567 else:
568 self._buffer.clear()
569 self._maybe_resume_transport()
570 raise ValueError(e.args[0])
571 return line
573 async def readuntil(self, separator=b'\n'):
574 """Read data from the stream until ``separator`` is found.
576 On success, the data and separator will be removed from the
577 internal buffer (consumed). Returned data will include the
578 separator at the end.
580 Configured stream limit is used to check result. Limit sets the
581 maximal length of data that can be returned, not counting the
582 separator.
584 If an EOF occurs and the complete separator is still not found,
585 an IncompleteReadError exception will be raised, and the internal
586 buffer will be reset. The IncompleteReadError.partial attribute
587 may contain the separator partially.
589 If the data cannot be read because of over limit, a
590 LimitOverrunError exception will be raised, and the data
591 will be left in the internal buffer, so it can be read again.
593 The ``separator`` may also be a tuple of separators. In this
594 case the return value will be the shortest possible that has any
595 separator as the suffix. For the purposes of LimitOverrunError,
596 the shortest possible separator is considered to be the one that
597 matched.
598 """
599 if isinstance(separator, tuple):
600 # Makes sure shortest matches wins
601 separator = sorted(separator, key=len)
602 else:
603 separator = [separator]
604 if not separator:
605 raise ValueError('Separator should contain at least one element')
606 min_seplen = len(separator[0])
607 max_seplen = len(separator[-1])
608 if min_seplen == 0:
609 raise ValueError('Separator should be at least one-byte string')
611 if self._exception is not None:
612 raise self._exception
614 # Consume whole buffer except last bytes, which length is
615 # one less than max_seplen. Let's check corner cases with
616 # separator[-1]='SEPARATOR':
617 # * we have received almost complete separator (without last
618 # byte). i.e buffer='some textSEPARATO'. In this case we
619 # can safely consume max_seplen - 1 bytes.
620 # * last byte of buffer is first byte of separator, i.e.
621 # buffer='abcdefghijklmnopqrS'. We may safely consume
622 # everything except that last byte, but this require to
623 # analyze bytes of buffer that match partial separator.
624 # This is slow and/or require FSM. For this case our
625 # implementation is not optimal, since require rescanning
626 # of data that is known to not belong to separator. In
627 # real world, separator will not be so long to notice
628 # performance problems. Even when reading MIME-encoded
629 # messages :)
631 # `offset` is the number of bytes from the beginning of the buffer
632 # where there is no occurrence of any `separator`.
633 offset = 0
635 # Loop until we find a `separator` in the buffer, exceed the buffer size,
636 # or an EOF has happened.
637 while True:
638 buflen = len(self._buffer)
640 # Check if we now have enough data in the buffer for shortest
641 # separator to fit.
642 if buflen - offset >= min_seplen:
643 match_start = None
644 match_end = None
645 for sep in separator:
646 isep = self._buffer.find(sep, offset)
648 if isep != -1:
649 # `separator` is in the buffer. `match_start` and
650 # `match_end` will be used later to retrieve the
651 # data.
652 end = isep + len(sep)
653 if match_end is None or end < match_end:
654 match_end = end
655 match_start = isep
656 if match_end is not None:
657 break
659 # see upper comment for explanation.
660 offset = max(0, buflen + 1 - max_seplen)
661 if offset > self._limit:
662 raise exceptions.LimitOverrunError(
663 'Separator is not found, and chunk exceed the limit',
664 offset)
666 # Complete message (with full separator) may be present in buffer
667 # even when EOF flag is set. This may happen when the last chunk
668 # adds data which makes separator be found. That's why we check for
669 # EOF *after* inspecting the buffer.
670 if self._eof:
671 chunk = bytes(self._buffer)
672 self._buffer.clear()
673 raise exceptions.IncompleteReadError(chunk, None)
675 # _wait_for_data() will resume reading if stream was paused.
676 await self._wait_for_data('readuntil')
678 if match_start > self._limit:
679 raise exceptions.LimitOverrunError(
680 'Separator is found, but chunk is longer than limit', match_start)
682 chunk = self._buffer[:match_end]
683 del self._buffer[:match_end]
684 self._maybe_resume_transport()
685 return bytes(chunk)
687 async def read(self, n=-1):
688 """Read up to `n` bytes from the stream.
690 If `n` is not provided or set to -1,
691 read until EOF, then return all read bytes.
692 If EOF was received and the internal buffer is empty,
693 return an empty bytes object.
695 If `n` is 0, return an empty bytes object immediately.
697 If `n` is positive, return at most `n` available bytes
698 as soon as at least 1 byte is available in the internal buffer.
699 If EOF is received before any byte is read, return an empty
700 bytes object.
702 Returned value is not limited with limit, configured at stream
703 creation.
705 If stream was paused, this function will automatically resume it if
706 needed.
707 """
709 if self._exception is not None:
710 raise self._exception
712 if n == 0:
713 return b''
715 if n < 0:
716 # This used to just loop creating a new waiter hoping to
717 # collect everything in self._buffer, but that would
718 # deadlock if the subprocess sends more than self.limit
719 # bytes. So just call self.read(self._limit) until EOF.
720 blocks = []
721 while True:
722 block = await self.read(self._limit)
723 if not block:
724 break
725 blocks.append(block)
726 return b''.join(blocks)
728 if not self._buffer and not self._eof:
729 await self._wait_for_data('read')
731 # This will work right even if buffer is less than n bytes
732 data = bytes(memoryview(self._buffer)[:n])
733 del self._buffer[:n]
735 self._maybe_resume_transport()
736 return data
738 async def readexactly(self, n):
739 """Read exactly `n` bytes.
741 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
742 read. The IncompleteReadError.partial attribute of the exception will
743 contain the partial read bytes.
745 if n is zero, return empty bytes object.
747 Returned value is not limited with limit, configured at stream
748 creation.
750 If stream was paused, this function will automatically resume it if
751 needed.
752 """
753 if n < 0:
754 raise ValueError('readexactly size can not be less than zero')
756 if self._exception is not None:
757 raise self._exception
759 if n == 0:
760 return b''
762 while len(self._buffer) < n:
763 if self._eof:
764 incomplete = bytes(self._buffer)
765 self._buffer.clear()
766 raise exceptions.IncompleteReadError(incomplete, n)
768 await self._wait_for_data('readexactly')
770 if len(self._buffer) == n:
771 data = bytes(self._buffer)
772 self._buffer.clear()
773 else:
774 data = bytes(memoryview(self._buffer)[:n])
775 del self._buffer[:n]
776 self._maybe_resume_transport()
777 return data
779 def __aiter__(self):
780 return self
782 async def __anext__(self):
783 val = await self.readline()
784 if val == b'':
785 raise StopAsyncIteration
786 return val