Coverage for Lib/asyncio/sslproto.py: 84%
535 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
1# Contains code from https://github.com/MagicStack/uvloop/tree/v0.16.0
2# SPDX-License-Identifier: PSF-2.0 AND (MIT OR Apache-2.0)
3# SPDX-FileCopyrightText: Copyright (c) 2015-2021 MagicStack Inc. http://magic.io
5import collections
6import enum
7import warnings
8try:
9 import ssl
10except ImportError: # pragma: no cover
11 ssl = None
13from . import constants
14from . import exceptions
15from . import protocols
16from . import transports
17from .log import logger
19if ssl is not None: 19 ↛ 23line 19 didn't jump to line 23 because the condition on line 19 was always true
20 SSLAgainErrors = (ssl.SSLWantReadError, ssl.SSLSyscallError)
23class SSLProtocolState(enum.Enum):
24 UNWRAPPED = "UNWRAPPED"
25 DO_HANDSHAKE = "DO_HANDSHAKE"
26 WRAPPED = "WRAPPED"
27 FLUSHING = "FLUSHING"
28 SHUTDOWN = "SHUTDOWN"
31class AppProtocolState(enum.Enum):
32 # This tracks the state of app protocol (https://git.io/fj59P):
33 #
34 # INIT -cm-> CON_MADE [-dr*->] [-er-> EOF?] -cl-> CON_LOST
35 #
36 # * cm: connection_made()
37 # * dr: data_received()
38 # * er: eof_received()
39 # * cl: connection_lost()
41 STATE_INIT = "STATE_INIT"
42 STATE_CON_MADE = "STATE_CON_MADE"
43 STATE_EOF = "STATE_EOF"
44 STATE_CON_LOST = "STATE_CON_LOST"
47def _create_transport_context(server_side, server_hostname):
48 if server_side: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true
49 raise ValueError('Server side SSL needs a valid SSLContext')
51 # Client side may pass ssl=True to use a default
52 # context; in that case the sslcontext passed is None.
53 # The default is secure for client connections.
54 # Python 3.4+: use up-to-date strong settings.
55 sslcontext = ssl.create_default_context()
56 if not server_hostname: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true
57 sslcontext.check_hostname = False
58 return sslcontext
61def add_flowcontrol_defaults(high, low, kb):
62 if high is None:
63 if low is None: 63 ↛ 66line 63 didn't jump to line 66 because the condition on line 63 was always true
64 hi = kb * 1024
65 else:
66 lo = low
67 hi = 4 * lo
68 else:
69 hi = high
70 if low is None: 70 ↛ 73line 70 didn't jump to line 73 because the condition on line 70 was always true
71 lo = hi // 4
72 else:
73 lo = low
75 if not hi >= lo >= 0: 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true
76 raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
77 (hi, lo))
79 return hi, lo
82class _SSLProtocolTransport(transports._FlowControlMixin,
83 transports.Transport):
85 _start_tls_compatible = True
86 _sendfile_compatible = constants._SendfileMode.FALLBACK
88 def __init__(self, loop, ssl_protocol):
89 self._loop = loop
90 self._ssl_protocol = ssl_protocol
91 self._closed = False
93 def get_extra_info(self, name, default=None):
94 """Get optional transport information."""
95 return self._ssl_protocol._get_extra_info(name, default)
97 def set_protocol(self, protocol):
98 self._ssl_protocol._set_app_protocol(protocol)
100 def get_protocol(self):
101 return self._ssl_protocol._app_protocol
103 def is_closing(self):
104 return self._closed or self._ssl_protocol._is_transport_closing()
106 def close(self):
107 """Close the transport.
109 Buffered data will be flushed asynchronously. No more data
110 will be received. After all buffered data is flushed, the
111 protocol's connection_lost() method will (eventually) called
112 with None as its argument.
113 """
114 if not self._closed:
115 self._closed = True
116 self._ssl_protocol._start_shutdown()
117 else:
118 self._ssl_protocol = None
120 def __del__(self, _warnings=warnings):
121 if not self._closed: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true
122 self._closed = True
123 _warnings.warn(
124 "unclosed transport <asyncio._SSLProtocolTransport "
125 "object>", ResourceWarning)
127 def is_reading(self):
128 return not self._ssl_protocol._app_reading_paused
130 def pause_reading(self):
131 """Pause the receiving end.
133 No data will be passed to the protocol's data_received()
134 method until resume_reading() is called.
135 """
136 self._ssl_protocol._pause_reading()
138 def resume_reading(self):
139 """Resume the receiving end.
141 Data received will once again be passed to the protocol's
142 data_received() method.
143 """
144 self._ssl_protocol._resume_reading()
146 def set_write_buffer_limits(self, high=None, low=None):
147 """Set the high- and low-water limits for write flow control.
149 These two values control when to call the protocol's
150 pause_writing() and resume_writing() methods. If specified,
151 the low-water limit must be less than or equal to the
152 high-water limit. Neither value can be negative.
154 The defaults are implementation-specific. If only the
155 high-water limit is given, the low-water limit defaults to an
156 implementation-specific value less than or equal to the
157 high-water limit. Setting high to zero forces low to zero as
158 well, and causes pause_writing() to be called whenever the
159 buffer becomes non-empty. Setting low to zero causes
160 resume_writing() to be called only once the buffer is empty.
161 Use of zero for either limit is generally sub-optimal as it
162 reduces opportunities for doing I/O and computation
163 concurrently.
164 """
165 self._ssl_protocol._set_write_buffer_limits(high, low)
166 self._ssl_protocol._control_app_writing()
168 def get_write_buffer_limits(self):
169 return (self._ssl_protocol._outgoing_low_water,
170 self._ssl_protocol._outgoing_high_water)
172 def get_write_buffer_size(self):
173 """Return the current size of the write buffers."""
174 return self._ssl_protocol._get_write_buffer_size()
176 def set_read_buffer_limits(self, high=None, low=None):
177 """Set the high- and low-water limits for read flow control.
179 These two values control when to call the upstream transport's
180 pause_reading() and resume_reading() methods. If specified,
181 the low-water limit must be less than or equal to the
182 high-water limit. Neither value can be negative.
184 The defaults are implementation-specific. If only the
185 high-water limit is given, the low-water limit defaults to an
186 implementation-specific value less than or equal to the
187 high-water limit. Setting high to zero forces low to zero as
188 well, and causes pause_reading() to be called whenever the
189 buffer becomes non-empty. Setting low to zero causes
190 resume_reading() to be called only once the buffer is empty.
191 Use of zero for either limit is generally sub-optimal as it
192 reduces opportunities for doing I/O and computation
193 concurrently.
194 """
195 self._ssl_protocol._set_read_buffer_limits(high, low)
196 self._ssl_protocol._control_ssl_reading()
198 def get_read_buffer_limits(self):
199 return (self._ssl_protocol._incoming_low_water,
200 self._ssl_protocol._incoming_high_water)
202 def get_read_buffer_size(self):
203 """Return the current size of the read buffer."""
204 return self._ssl_protocol._get_read_buffer_size()
206 @property
207 def _protocol_paused(self):
208 # Required for sendfile fallback pause_writing/resume_writing logic
209 return self._ssl_protocol._app_writing_paused
211 def write(self, data):
212 """Write some data bytes to the transport.
214 This does not block; it buffers the data and arranges for it
215 to be sent out asynchronously.
216 """
217 if not isinstance(data, (bytes, bytearray, memoryview)): 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true
218 raise TypeError(f"data: expecting a bytes-like instance, "
219 f"got {type(data).__name__}")
220 if not data: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true
221 return
222 self._ssl_protocol._write_appdata((data,))
224 def writelines(self, list_of_data):
225 """Write a list (or any iterable) of data bytes to the transport.
227 The default implementation concatenates the arguments and
228 calls write() on the result.
229 """
230 self._ssl_protocol._write_appdata(list_of_data)
232 def write_eof(self):
233 """Close the write end after flushing buffered data.
235 This raises :exc:`NotImplementedError` right now.
236 """
237 raise NotImplementedError
239 def can_write_eof(self):
240 """Return True if this transport supports write_eof(), False if not."""
241 return False
243 def abort(self):
244 """Close the transport immediately.
246 Buffered data will be lost. No more data will be received.
247 The protocol's connection_lost() method will (eventually) be
248 called with None as its argument.
249 """
250 self._force_close(None)
252 def _force_close(self, exc):
253 self._closed = True
254 if self._ssl_protocol is not None: 254 ↛ exitline 254 didn't return from function '_force_close' because the condition on line 254 was always true
255 self._ssl_protocol._abort(exc)
257 def _test__append_write_backlog(self, data):
258 # for test only
259 self._ssl_protocol._write_backlog.append(data)
260 self._ssl_protocol._write_buffer_size += len(data)
263class SSLProtocol(protocols.BufferedProtocol):
264 max_size = 256 * 1024 # Buffer size passed to read()
266 _handshake_start_time = None
267 _handshake_timeout_handle = None
268 _shutdown_timeout_handle = None
270 def __init__(self, loop, app_protocol, sslcontext, waiter,
271 server_side=False, server_hostname=None,
272 call_connection_made=True,
273 ssl_handshake_timeout=None,
274 ssl_shutdown_timeout=None):
275 if ssl is None:
276 raise RuntimeError("stdlib ssl module not available")
278 self._ssl_buffer = bytearray(self.max_size)
279 self._ssl_buffer_view = memoryview(self._ssl_buffer)
281 if ssl_handshake_timeout is None:
282 ssl_handshake_timeout = constants.SSL_HANDSHAKE_TIMEOUT
283 elif ssl_handshake_timeout <= 0:
284 raise ValueError(
285 f"ssl_handshake_timeout should be a positive number, "
286 f"got {ssl_handshake_timeout}")
287 if ssl_shutdown_timeout is None: 287 ↛ 289line 287 didn't jump to line 289 because the condition on line 287 was always true
288 ssl_shutdown_timeout = constants.SSL_SHUTDOWN_TIMEOUT
289 elif ssl_shutdown_timeout <= 0:
290 raise ValueError(
291 f"ssl_shutdown_timeout should be a positive number, "
292 f"got {ssl_shutdown_timeout}")
294 if not sslcontext:
295 sslcontext = _create_transport_context(
296 server_side, server_hostname)
298 self._server_side = server_side
299 if server_hostname and not server_side:
300 self._server_hostname = server_hostname
301 else:
302 self._server_hostname = None
303 self._sslcontext = sslcontext
304 # SSL-specific extra info. More info are set when the handshake
305 # completes.
306 self._extra = dict(sslcontext=sslcontext)
308 # App data write buffering
309 self._write_backlog = collections.deque()
310 self._write_buffer_size = 0
312 self._waiter = waiter
313 self._loop = loop
314 self._set_app_protocol(app_protocol)
315 self._app_transport = None
316 self._app_transport_created = False
317 # transport, ex: SelectorSocketTransport
318 self._transport = None
319 self._ssl_handshake_timeout = ssl_handshake_timeout
320 self._ssl_shutdown_timeout = ssl_shutdown_timeout
321 # SSL and state machine
322 self._incoming = ssl.MemoryBIO()
323 self._outgoing = ssl.MemoryBIO()
324 self._state = SSLProtocolState.UNWRAPPED
325 self._conn_lost = 0 # Set when connection_lost called
326 if call_connection_made:
327 self._app_state = AppProtocolState.STATE_INIT
328 else:
329 self._app_state = AppProtocolState.STATE_CON_MADE
330 self._sslobj = self._sslcontext.wrap_bio(
331 self._incoming, self._outgoing,
332 server_side=self._server_side,
333 server_hostname=self._server_hostname)
335 # Flow Control
337 self._ssl_writing_paused = False
339 self._app_reading_paused = False
341 self._ssl_reading_paused = False
342 self._incoming_high_water = 0
343 self._incoming_low_water = 0
344 self._set_read_buffer_limits()
345 self._eof_received = False
347 self._app_writing_paused = False
348 self._outgoing_high_water = 0
349 self._outgoing_low_water = 0
350 self._set_write_buffer_limits()
351 self._get_app_transport()
353 def _set_app_protocol(self, app_protocol):
354 self._app_protocol = app_protocol
355 # Make fast hasattr check first
356 if (hasattr(app_protocol, 'get_buffer') and
357 isinstance(app_protocol, protocols.BufferedProtocol)):
358 self._app_protocol_get_buffer = app_protocol.get_buffer
359 self._app_protocol_buffer_updated = app_protocol.buffer_updated
360 self._app_protocol_is_buffer = True
361 else:
362 self._app_protocol_is_buffer = False
364 def _wakeup_waiter(self, exc=None):
365 if self._waiter is None:
366 return
367 if not self._waiter.cancelled():
368 if exc is not None:
369 self._waiter.set_exception(exc)
370 else:
371 self._waiter.set_result(None)
372 self._waiter = None
374 def _get_app_transport(self):
375 if self._app_transport is None:
376 if self._app_transport_created: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true
377 raise RuntimeError('Creating _SSLProtocolTransport twice')
378 self._app_transport = _SSLProtocolTransport(self._loop, self)
379 self._app_transport_created = True
380 return self._app_transport
382 def _is_transport_closing(self):
383 return self._transport is not None and self._transport.is_closing()
385 def connection_made(self, transport):
386 """Called when the low-level connection is made.
388 Start the SSL handshake.
389 """
390 self._transport = transport
391 self._start_handshake()
393 def connection_lost(self, exc):
394 """Called when the low-level connection is lost or closed.
396 The argument is an exception object or None (the latter
397 meaning a regular EOF is received or the connection was
398 aborted or closed).
399 """
400 self._write_backlog.clear()
401 self._outgoing.read()
402 self._conn_lost += 1
404 # Just mark the app transport as closed so that its __dealloc__
405 # doesn't complain.
406 if self._app_transport is not None: 406 ↛ 409line 406 didn't jump to line 409 because the condition on line 406 was always true
407 self._app_transport._closed = True
409 if self._state != SSLProtocolState.DO_HANDSHAKE:
410 if (
411 self._app_state == AppProtocolState.STATE_CON_MADE or
412 self._app_state == AppProtocolState.STATE_EOF
413 ):
414 self._app_state = AppProtocolState.STATE_CON_LOST
415 self._loop.call_soon(self._app_protocol.connection_lost, exc)
416 self._set_state(SSLProtocolState.UNWRAPPED)
417 self._transport = None
418 self._app_transport = None
419 self._app_protocol = None
420 self._wakeup_waiter(exc)
422 if self._shutdown_timeout_handle:
423 self._shutdown_timeout_handle.cancel()
424 self._shutdown_timeout_handle = None
425 if self._handshake_timeout_handle:
426 self._handshake_timeout_handle.cancel()
427 self._handshake_timeout_handle = None
429 def get_buffer(self, n):
430 want = n
431 if want <= 0 or want > self.max_size:
432 want = self.max_size
433 if len(self._ssl_buffer) < want: 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true
434 self._ssl_buffer = bytearray(want)
435 self._ssl_buffer_view = memoryview(self._ssl_buffer)
436 return self._ssl_buffer_view
438 def buffer_updated(self, nbytes):
439 self._incoming.write(self._ssl_buffer_view[:nbytes])
441 if self._state == SSLProtocolState.DO_HANDSHAKE:
442 self._do_handshake()
444 elif self._state == SSLProtocolState.WRAPPED:
445 self._do_read()
447 elif self._state == SSLProtocolState.FLUSHING: 447 ↛ 448line 447 didn't jump to line 448 because the condition on line 447 was never true
448 self._do_flush()
450 elif self._state == SSLProtocolState.SHUTDOWN: 450 ↛ exitline 450 didn't return from function 'buffer_updated' because the condition on line 450 was always true
451 self._do_shutdown()
453 def eof_received(self):
454 """Called when the other end of the low-level stream
455 is half-closed.
457 If this returns a false value (including None), the transport
458 will close itself. If it returns a true value, closing the
459 transport is up to the protocol.
460 """
461 self._eof_received = True
462 try:
463 if self._loop.get_debug(): 463 ↛ 464line 463 didn't jump to line 464 because the condition on line 463 was never true
464 logger.debug("%r received EOF", self)
466 if self._state == SSLProtocolState.DO_HANDSHAKE:
467 self._on_handshake_complete(ConnectionResetError)
469 elif self._state == SSLProtocolState.WRAPPED:
470 self._set_state(SSLProtocolState.FLUSHING)
471 if self._app_reading_paused:
472 return True
473 else:
474 self._do_flush()
476 elif self._state == SSLProtocolState.FLUSHING: 476 ↛ 477line 476 didn't jump to line 477 because the condition on line 476 was never true
477 self._do_write()
478 self._set_state(SSLProtocolState.SHUTDOWN)
479 self._do_shutdown()
481 elif self._state == SSLProtocolState.SHUTDOWN: 481 ↛ exitline 481 didn't return from function 'eof_received' because the condition on line 481 was always true
482 self._do_shutdown()
484 except Exception:
485 self._transport.close()
486 raise
488 def _get_extra_info(self, name, default=None):
489 if name in self._extra:
490 return self._extra[name]
491 elif self._transport is not None:
492 return self._transport.get_extra_info(name, default)
493 else:
494 return default
496 def _set_state(self, new_state):
497 allowed = False
499 if new_state == SSLProtocolState.UNWRAPPED:
500 allowed = True
502 elif (
503 self._state == SSLProtocolState.UNWRAPPED and
504 new_state == SSLProtocolState.DO_HANDSHAKE
505 ):
506 allowed = True
508 elif (
509 self._state == SSLProtocolState.DO_HANDSHAKE and
510 new_state == SSLProtocolState.WRAPPED
511 ):
512 allowed = True
514 elif (
515 self._state == SSLProtocolState.WRAPPED and
516 new_state == SSLProtocolState.FLUSHING
517 ):
518 allowed = True
520 elif ( 520 ↛ 526line 520 didn't jump to line 526 because the condition on line 520 was always true
521 self._state == SSLProtocolState.FLUSHING and
522 new_state == SSLProtocolState.SHUTDOWN
523 ):
524 allowed = True
526 if allowed: 526 ↛ 530line 526 didn't jump to line 530 because the condition on line 526 was always true
527 self._state = new_state
529 else:
530 raise RuntimeError(
531 'cannot switch state from {} to {}'.format(
532 self._state, new_state))
534 # Handshake flow
536 def _start_handshake(self):
537 if self._loop.get_debug(): 537 ↛ 538line 537 didn't jump to line 538 because the condition on line 537 was never true
538 logger.debug("%r starts SSL handshake", self)
539 self._handshake_start_time = self._loop.time()
540 else:
541 self._handshake_start_time = None
543 self._set_state(SSLProtocolState.DO_HANDSHAKE)
545 # start handshake timeout count down
546 self._handshake_timeout_handle = \
547 self._loop.call_later(self._ssl_handshake_timeout,
548 self._check_handshake_timeout)
550 self._do_handshake()
552 def _check_handshake_timeout(self):
553 if self._state == SSLProtocolState.DO_HANDSHAKE: 553 ↛ exitline 553 didn't return from function '_check_handshake_timeout' because the condition on line 553 was always true
554 msg = (
555 f"SSL handshake is taking longer than "
556 f"{self._ssl_handshake_timeout} seconds: "
557 f"aborting the connection"
558 )
559 self._fatal_error(ConnectionAbortedError(msg))
561 def _do_handshake(self):
562 try:
563 self._sslobj.do_handshake()
564 except SSLAgainErrors:
565 self._process_outgoing()
566 except ssl.SSLError as exc:
567 self._on_handshake_complete(exc)
568 else:
569 self._on_handshake_complete(None)
571 def _on_handshake_complete(self, handshake_exc):
572 if self._handshake_timeout_handle is not None: 572 ↛ 576line 572 didn't jump to line 576 because the condition on line 572 was always true
573 self._handshake_timeout_handle.cancel()
574 self._handshake_timeout_handle = None
576 sslobj = self._sslobj
577 try:
578 if handshake_exc is None:
579 self._set_state(SSLProtocolState.WRAPPED)
580 else:
581 raise handshake_exc
583 peercert = sslobj.getpeercert()
584 except Exception as exc:
585 handshake_exc = None
586 self._set_state(SSLProtocolState.UNWRAPPED)
587 if isinstance(exc, ssl.CertificateError):
588 msg = 'SSL handshake failed on verifying the certificate'
589 else:
590 msg = 'SSL handshake failed'
591 self._fatal_error(exc, msg)
592 self._wakeup_waiter(exc)
593 return
595 if self._loop.get_debug(): 595 ↛ 596line 595 didn't jump to line 596 because the condition on line 595 was never true
596 dt = self._loop.time() - self._handshake_start_time
597 logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
599 # Add extra info that becomes available after handshake.
600 self._extra.update(peercert=peercert,
601 cipher=sslobj.cipher(),
602 compression=sslobj.compression(),
603 ssl_object=sslobj)
604 if self._app_state == AppProtocolState.STATE_INIT:
605 self._app_state = AppProtocolState.STATE_CON_MADE
606 self._app_protocol.connection_made(self._get_app_transport())
607 self._wakeup_waiter()
608 self._do_read()
610 # Shutdown flow
612 def _start_shutdown(self):
613 if (
614 self._state in (
615 SSLProtocolState.FLUSHING,
616 SSLProtocolState.SHUTDOWN,
617 SSLProtocolState.UNWRAPPED
618 )
619 ):
620 return
621 if self._app_transport is not None: 621 ↛ 623line 621 didn't jump to line 623 because the condition on line 621 was always true
622 self._app_transport._closed = True
623 if self._state == SSLProtocolState.DO_HANDSHAKE:
624 self._abort(None)
625 else:
626 self._set_state(SSLProtocolState.FLUSHING)
627 self._shutdown_timeout_handle = self._loop.call_later(
628 self._ssl_shutdown_timeout,
629 self._check_shutdown_timeout
630 )
631 self._do_flush()
633 def _check_shutdown_timeout(self):
634 if (
635 self._state in (
636 SSLProtocolState.FLUSHING,
637 SSLProtocolState.SHUTDOWN
638 )
639 ):
640 self._transport._force_close(
641 exceptions.TimeoutError('SSL shutdown timed out'))
643 def _do_flush(self):
644 self._do_read()
645 self._set_state(SSLProtocolState.SHUTDOWN)
646 self._do_shutdown()
648 def _do_shutdown(self):
649 try:
650 if not self._eof_received:
651 self._sslobj.unwrap()
652 except SSLAgainErrors:
653 self._process_outgoing()
654 except ssl.SSLError as exc:
655 self._on_shutdown_complete(exc)
656 else:
657 self._process_outgoing()
658 self._call_eof_received()
659 self._on_shutdown_complete(None)
661 def _on_shutdown_complete(self, shutdown_exc):
662 if self._shutdown_timeout_handle is not None:
663 self._shutdown_timeout_handle.cancel()
664 self._shutdown_timeout_handle = None
666 if shutdown_exc: 666 ↛ 667line 666 didn't jump to line 667 because the condition on line 666 was never true
667 self._fatal_error(shutdown_exc)
668 else:
669 self._loop.call_soon(self._transport.close)
671 def _abort(self, exc):
672 self._set_state(SSLProtocolState.UNWRAPPED)
673 if self._transport is not None: 673 ↛ exitline 673 didn't return from function '_abort' because the condition on line 673 was always true
674 self._transport._force_close(exc)
676 # Outgoing flow
678 def _write_appdata(self, list_of_data):
679 if (
680 self._state in (
681 SSLProtocolState.FLUSHING,
682 SSLProtocolState.SHUTDOWN,
683 SSLProtocolState.UNWRAPPED
684 )
685 ):
686 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 686 ↛ 687line 686 didn't jump to line 687 because the condition on line 686 was never true
687 logger.warning('SSL connection is closed')
688 self._conn_lost += 1
689 return
691 for data in list_of_data:
692 self._write_backlog.append(data)
693 self._write_buffer_size += len(data)
695 try:
696 if self._state == SSLProtocolState.WRAPPED: 696 ↛ exitline 696 didn't return from function '_write_appdata' because the condition on line 696 was always true
697 self._do_write()
699 except Exception as ex:
700 self._fatal_error(ex, 'Fatal error on SSL protocol')
702 def _do_write(self):
703 try:
704 while self._write_backlog:
705 data = self._write_backlog[0]
706 count = self._sslobj.write(data)
707 data_len = len(data)
708 if count < data_len: 708 ↛ 709line 708 didn't jump to line 709 because the condition on line 708 was never true
709 self._write_backlog[0] = data[count:]
710 self._write_buffer_size -= count
711 else:
712 del self._write_backlog[0]
713 self._write_buffer_size -= data_len
714 except SSLAgainErrors:
715 pass
716 self._process_outgoing()
718 def _process_outgoing(self):
719 if not self._ssl_writing_paused:
720 data = self._outgoing.read()
721 if len(data):
722 self._transport.write(data)
723 self._control_app_writing()
725 # Incoming flow
727 def _do_read(self):
728 if ( 728 ↛ 734line 728 didn't jump to line 734 because the condition on line 728 was never true
729 self._state not in (
730 SSLProtocolState.WRAPPED,
731 SSLProtocolState.FLUSHING,
732 )
733 ):
734 return
735 try:
736 if not self._app_reading_paused:
737 if self._app_protocol_is_buffer:
738 self._do_read__buffered()
739 else:
740 self._do_read__copied()
741 if self._write_backlog:
742 self._do_write()
743 else:
744 self._process_outgoing()
745 self._control_ssl_reading()
746 except Exception as ex:
747 self._fatal_error(ex, 'Fatal error on SSL protocol')
749 def _do_read__buffered(self):
750 offset = 0
751 count = 1
753 buf = self._app_protocol_get_buffer(self._get_read_buffer_size())
754 wants = len(buf)
756 try:
757 count = self._sslobj.read(wants, buf)
759 if count > 0: 759 ↛ 771line 759 didn't jump to line 771 because the condition on line 759 was always true
760 offset = count
761 while offset < wants:
762 count = self._sslobj.read(wants - offset, buf[offset:])
763 if count > 0: 763 ↛ 766line 763 didn't jump to line 766 because the condition on line 763 was always true
764 offset += count
765 else:
766 break
767 else:
768 self._loop.call_soon(self._do_read)
769 except SSLAgainErrors:
770 pass
771 if offset > 0:
772 self._app_protocol_buffer_updated(offset)
773 if not count: 773 ↛ 775line 773 didn't jump to line 775 because the condition on line 773 was never true
774 # close_notify
775 self._call_eof_received()
776 self._start_shutdown()
778 def _do_read__copied(self):
779 chunk = b'1'
780 zero = True
781 one = False
783 try:
784 while True:
785 chunk = self._sslobj.read(self.max_size)
786 if not chunk:
787 break
788 if zero:
789 zero = False
790 one = True
791 first = chunk
792 elif one:
793 one = False
794 data = [first, chunk]
795 else:
796 data.append(chunk)
797 except SSLAgainErrors:
798 pass
799 if one:
800 self._app_protocol.data_received(first)
801 elif not zero:
802 self._app_protocol.data_received(b''.join(data))
803 if not chunk:
804 # close_notify
805 self._call_eof_received()
806 self._start_shutdown()
808 def _call_eof_received(self):
809 try:
810 if self._app_state == AppProtocolState.STATE_CON_MADE:
811 self._app_state = AppProtocolState.STATE_EOF
812 keep_open = self._app_protocol.eof_received()
813 if keep_open: 813 ↛ 814line 813 didn't jump to line 814 because the condition on line 813 was never true
814 logger.warning('returning true from eof_received() '
815 'has no effect when using ssl')
816 except (KeyboardInterrupt, SystemExit):
817 raise
818 except BaseException as ex:
819 self._fatal_error(ex, 'Error calling eof_received()')
821 # Flow control for writes from APP socket
823 def _control_app_writing(self):
824 size = self._get_write_buffer_size()
825 if size >= self._outgoing_high_water and not self._app_writing_paused:
826 self._app_writing_paused = True
827 try:
828 self._app_protocol.pause_writing()
829 except (KeyboardInterrupt, SystemExit):
830 raise
831 except BaseException as exc:
832 self._loop.call_exception_handler({
833 'message': 'protocol.pause_writing() failed',
834 'exception': exc,
835 'transport': self._app_transport,
836 'protocol': self,
837 })
838 elif size <= self._outgoing_low_water and self._app_writing_paused:
839 self._app_writing_paused = False
840 try:
841 self._app_protocol.resume_writing()
842 except (KeyboardInterrupt, SystemExit):
843 raise
844 except BaseException as exc:
845 self._loop.call_exception_handler({
846 'message': 'protocol.resume_writing() failed',
847 'exception': exc,
848 'transport': self._app_transport,
849 'protocol': self,
850 })
852 def _get_write_buffer_size(self):
853 return self._outgoing.pending + self._write_buffer_size
855 def _set_write_buffer_limits(self, high=None, low=None):
856 high, low = add_flowcontrol_defaults(
857 high, low, constants.FLOW_CONTROL_HIGH_WATER_SSL_WRITE)
858 self._outgoing_high_water = high
859 self._outgoing_low_water = low
861 # Flow control for reads to APP socket
863 def _pause_reading(self):
864 self._app_reading_paused = True
866 def _resume_reading(self):
867 if self._app_reading_paused: 867 ↛ exitline 867 didn't return from function '_resume_reading' because the condition on line 867 was always true
868 self._app_reading_paused = False
870 def resume():
871 if self._state == SSLProtocolState.WRAPPED:
872 self._do_read()
873 elif self._state == SSLProtocolState.FLUSHING:
874 self._do_flush()
875 elif self._state == SSLProtocolState.SHUTDOWN: 875 ↛ exitline 875 didn't return from function 'resume' because the condition on line 875 was always true
876 self._do_shutdown()
877 self._loop.call_soon(resume)
879 # Flow control for reads from SSL socket
881 def _control_ssl_reading(self):
882 size = self._get_read_buffer_size()
883 if size >= self._incoming_high_water and not self._ssl_reading_paused: 883 ↛ 884line 883 didn't jump to line 884 because the condition on line 883 was never true
884 self._ssl_reading_paused = True
885 self._transport.pause_reading()
886 elif size <= self._incoming_low_water and self._ssl_reading_paused: 886 ↛ 887line 886 didn't jump to line 887 because the condition on line 886 was never true
887 self._ssl_reading_paused = False
888 self._transport.resume_reading()
890 def _set_read_buffer_limits(self, high=None, low=None):
891 high, low = add_flowcontrol_defaults(
892 high, low, constants.FLOW_CONTROL_HIGH_WATER_SSL_READ)
893 self._incoming_high_water = high
894 self._incoming_low_water = low
896 def _get_read_buffer_size(self):
897 return self._incoming.pending
899 # Flow control for writes to SSL socket
901 def pause_writing(self):
902 """Called when the low-level transport's buffer goes over
903 the high-water mark.
904 """
905 assert not self._ssl_writing_paused
906 self._ssl_writing_paused = True
908 def resume_writing(self):
909 """Called when the low-level transport's buffer drains below
910 the low-water mark.
911 """
912 assert self._ssl_writing_paused
913 self._ssl_writing_paused = False
914 self._process_outgoing()
916 def _fatal_error(self, exc, message='Fatal error on transport'):
917 if self._transport:
918 self._transport._force_close(exc)
920 if isinstance(exc, OSError):
921 if self._loop.get_debug(): 921 ↛ 922line 921 didn't jump to line 922 because the condition on line 921 was never true
922 logger.debug("%r: %s", self, message, exc_info=True)
923 elif not isinstance(exc, exceptions.CancelledError): 923 ↛ exitline 923 didn't return from function '_fatal_error' because the condition on line 923 was always true
924 self._loop.call_exception_handler({
925 'message': message,
926 'exception': exc,
927 'transport': self._transport,
928 'protocol': self,
929 })