Coverage for Lib/asyncio/sslproto.py: 84%

535 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-15 02:02 +0000

1# Contains code from https://github.com/MagicStack/uvloop/tree/v0.16.0 

2# SPDX-License-Identifier: PSF-2.0 AND (MIT OR Apache-2.0) 

3# SPDX-FileCopyrightText: Copyright (c) 2015-2021 MagicStack Inc. http://magic.io 

4 

5import collections 

6import enum 

7import warnings 

8try: 

9 import ssl 

10except ImportError: # pragma: no cover 

11 ssl = None 

12 

13from . import constants 

14from . import exceptions 

15from . import protocols 

16from . import transports 

17from .log import logger 

18 

19if ssl is not None: 19 ↛ 23line 19 didn't jump to line 23 because the condition on line 19 was always true

20 SSLAgainErrors = (ssl.SSLWantReadError, ssl.SSLSyscallError) 

21 

22 

23class SSLProtocolState(enum.Enum): 

24 UNWRAPPED = "UNWRAPPED" 

25 DO_HANDSHAKE = "DO_HANDSHAKE" 

26 WRAPPED = "WRAPPED" 

27 FLUSHING = "FLUSHING" 

28 SHUTDOWN = "SHUTDOWN" 

29 

30 

31class AppProtocolState(enum.Enum): 

32 # This tracks the state of app protocol (https://git.io/fj59P): 

33 # 

34 # INIT -cm-> CON_MADE [-dr*->] [-er-> EOF?] -cl-> CON_LOST 

35 # 

36 # * cm: connection_made() 

37 # * dr: data_received() 

38 # * er: eof_received() 

39 # * cl: connection_lost() 

40 

41 STATE_INIT = "STATE_INIT" 

42 STATE_CON_MADE = "STATE_CON_MADE" 

43 STATE_EOF = "STATE_EOF" 

44 STATE_CON_LOST = "STATE_CON_LOST" 

45 

46 

47def _create_transport_context(server_side, server_hostname): 

48 if server_side: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true

49 raise ValueError('Server side SSL needs a valid SSLContext') 

50 

51 # Client side may pass ssl=True to use a default 

52 # context; in that case the sslcontext passed is None. 

53 # The default is secure for client connections. 

54 # Python 3.4+: use up-to-date strong settings. 

55 sslcontext = ssl.create_default_context() 

56 if not server_hostname: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true

57 sslcontext.check_hostname = False 

58 return sslcontext 

59 

60 

61def add_flowcontrol_defaults(high, low, kb): 

62 if high is None: 

63 if low is None: 63 ↛ 66line 63 didn't jump to line 66 because the condition on line 63 was always true

64 hi = kb * 1024 

65 else: 

66 lo = low 

67 hi = 4 * lo 

68 else: 

69 hi = high 

70 if low is None: 70 ↛ 73line 70 didn't jump to line 73 because the condition on line 70 was always true

71 lo = hi // 4 

72 else: 

73 lo = low 

74 

75 if not hi >= lo >= 0: 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true

76 raise ValueError('high (%r) must be >= low (%r) must be >= 0' % 

77 (hi, lo)) 

78 

79 return hi, lo 

80 

81 

82class _SSLProtocolTransport(transports._FlowControlMixin, 

83 transports.Transport): 

84 

85 _start_tls_compatible = True 

86 _sendfile_compatible = constants._SendfileMode.FALLBACK 

87 

88 def __init__(self, loop, ssl_protocol): 

89 self._loop = loop 

90 self._ssl_protocol = ssl_protocol 

91 self._closed = False 

92 

93 def get_extra_info(self, name, default=None): 

94 """Get optional transport information.""" 

95 return self._ssl_protocol._get_extra_info(name, default) 

96 

97 def set_protocol(self, protocol): 

98 self._ssl_protocol._set_app_protocol(protocol) 

99 

100 def get_protocol(self): 

101 return self._ssl_protocol._app_protocol 

102 

103 def is_closing(self): 

104 return self._closed or self._ssl_protocol._is_transport_closing() 

105 

106 def close(self): 

107 """Close the transport. 

108 

109 Buffered data will be flushed asynchronously. No more data 

110 will be received. After all buffered data is flushed, the 

111 protocol's connection_lost() method will (eventually) called 

112 with None as its argument. 

113 """ 

114 if not self._closed: 

115 self._closed = True 

116 self._ssl_protocol._start_shutdown() 

117 else: 

118 self._ssl_protocol = None 

119 

120 def __del__(self, _warnings=warnings): 

121 if not self._closed: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true

122 self._closed = True 

123 _warnings.warn( 

124 "unclosed transport <asyncio._SSLProtocolTransport " 

125 "object>", ResourceWarning) 

126 

127 def is_reading(self): 

128 return not self._ssl_protocol._app_reading_paused 

129 

130 def pause_reading(self): 

131 """Pause the receiving end. 

132 

133 No data will be passed to the protocol's data_received() 

134 method until resume_reading() is called. 

135 """ 

136 self._ssl_protocol._pause_reading() 

137 

138 def resume_reading(self): 

139 """Resume the receiving end. 

140 

141 Data received will once again be passed to the protocol's 

142 data_received() method. 

143 """ 

144 self._ssl_protocol._resume_reading() 

145 

146 def set_write_buffer_limits(self, high=None, low=None): 

147 """Set the high- and low-water limits for write flow control. 

148 

149 These two values control when to call the protocol's 

150 pause_writing() and resume_writing() methods. If specified, 

151 the low-water limit must be less than or equal to the 

152 high-water limit. Neither value can be negative. 

153 

154 The defaults are implementation-specific. If only the 

155 high-water limit is given, the low-water limit defaults to an 

156 implementation-specific value less than or equal to the 

157 high-water limit. Setting high to zero forces low to zero as 

158 well, and causes pause_writing() to be called whenever the 

159 buffer becomes non-empty. Setting low to zero causes 

160 resume_writing() to be called only once the buffer is empty. 

161 Use of zero for either limit is generally sub-optimal as it 

162 reduces opportunities for doing I/O and computation 

163 concurrently. 

164 """ 

165 self._ssl_protocol._set_write_buffer_limits(high, low) 

166 self._ssl_protocol._control_app_writing() 

167 

168 def get_write_buffer_limits(self): 

169 return (self._ssl_protocol._outgoing_low_water, 

170 self._ssl_protocol._outgoing_high_water) 

171 

172 def get_write_buffer_size(self): 

173 """Return the current size of the write buffers.""" 

174 return self._ssl_protocol._get_write_buffer_size() 

175 

176 def set_read_buffer_limits(self, high=None, low=None): 

177 """Set the high- and low-water limits for read flow control. 

178 

179 These two values control when to call the upstream transport's 

180 pause_reading() and resume_reading() methods. If specified, 

181 the low-water limit must be less than or equal to the 

182 high-water limit. Neither value can be negative. 

183 

184 The defaults are implementation-specific. If only the 

185 high-water limit is given, the low-water limit defaults to an 

186 implementation-specific value less than or equal to the 

187 high-water limit. Setting high to zero forces low to zero as 

188 well, and causes pause_reading() to be called whenever the 

189 buffer becomes non-empty. Setting low to zero causes 

190 resume_reading() to be called only once the buffer is empty. 

191 Use of zero for either limit is generally sub-optimal as it 

192 reduces opportunities for doing I/O and computation 

193 concurrently. 

194 """ 

195 self._ssl_protocol._set_read_buffer_limits(high, low) 

196 self._ssl_protocol._control_ssl_reading() 

197 

198 def get_read_buffer_limits(self): 

199 return (self._ssl_protocol._incoming_low_water, 

200 self._ssl_protocol._incoming_high_water) 

201 

202 def get_read_buffer_size(self): 

203 """Return the current size of the read buffer.""" 

204 return self._ssl_protocol._get_read_buffer_size() 

205 

206 @property 

207 def _protocol_paused(self): 

208 # Required for sendfile fallback pause_writing/resume_writing logic 

209 return self._ssl_protocol._app_writing_paused 

210 

211 def write(self, data): 

212 """Write some data bytes to the transport. 

213 

214 This does not block; it buffers the data and arranges for it 

215 to be sent out asynchronously. 

216 """ 

217 if not isinstance(data, (bytes, bytearray, memoryview)): 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 raise TypeError(f"data: expecting a bytes-like instance, " 

219 f"got {type(data).__name__}") 

220 if not data: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true

221 return 

222 self._ssl_protocol._write_appdata((data,)) 

223 

224 def writelines(self, list_of_data): 

225 """Write a list (or any iterable) of data bytes to the transport. 

226 

227 The default implementation concatenates the arguments and 

228 calls write() on the result. 

229 """ 

230 self._ssl_protocol._write_appdata(list_of_data) 

231 

232 def write_eof(self): 

233 """Close the write end after flushing buffered data. 

234 

235 This raises :exc:`NotImplementedError` right now. 

236 """ 

237 raise NotImplementedError 

238 

239 def can_write_eof(self): 

240 """Return True if this transport supports write_eof(), False if not.""" 

241 return False 

242 

243 def abort(self): 

244 """Close the transport immediately. 

245 

246 Buffered data will be lost. No more data will be received. 

247 The protocol's connection_lost() method will (eventually) be 

248 called with None as its argument. 

249 """ 

250 self._force_close(None) 

251 

252 def _force_close(self, exc): 

253 self._closed = True 

254 if self._ssl_protocol is not None: 254 ↛ exitline 254 didn't return from function '_force_close' because the condition on line 254 was always true

255 self._ssl_protocol._abort(exc) 

256 

257 def _test__append_write_backlog(self, data): 

258 # for test only 

259 self._ssl_protocol._write_backlog.append(data) 

260 self._ssl_protocol._write_buffer_size += len(data) 

261 

262 

263class SSLProtocol(protocols.BufferedProtocol): 

264 max_size = 256 * 1024 # Buffer size passed to read() 

265 

266 _handshake_start_time = None 

267 _handshake_timeout_handle = None 

268 _shutdown_timeout_handle = None 

269 

270 def __init__(self, loop, app_protocol, sslcontext, waiter, 

271 server_side=False, server_hostname=None, 

272 call_connection_made=True, 

273 ssl_handshake_timeout=None, 

274 ssl_shutdown_timeout=None): 

275 if ssl is None: 

276 raise RuntimeError("stdlib ssl module not available") 

277 

278 self._ssl_buffer = bytearray(self.max_size) 

279 self._ssl_buffer_view = memoryview(self._ssl_buffer) 

280 

281 if ssl_handshake_timeout is None: 

282 ssl_handshake_timeout = constants.SSL_HANDSHAKE_TIMEOUT 

283 elif ssl_handshake_timeout <= 0: 

284 raise ValueError( 

285 f"ssl_handshake_timeout should be a positive number, " 

286 f"got {ssl_handshake_timeout}") 

287 if ssl_shutdown_timeout is None: 287 ↛ 289line 287 didn't jump to line 289 because the condition on line 287 was always true

288 ssl_shutdown_timeout = constants.SSL_SHUTDOWN_TIMEOUT 

289 elif ssl_shutdown_timeout <= 0: 

290 raise ValueError( 

291 f"ssl_shutdown_timeout should be a positive number, " 

292 f"got {ssl_shutdown_timeout}") 

293 

294 if not sslcontext: 

295 sslcontext = _create_transport_context( 

296 server_side, server_hostname) 

297 

298 self._server_side = server_side 

299 if server_hostname and not server_side: 

300 self._server_hostname = server_hostname 

301 else: 

302 self._server_hostname = None 

303 self._sslcontext = sslcontext 

304 # SSL-specific extra info. More info are set when the handshake 

305 # completes. 

306 self._extra = dict(sslcontext=sslcontext) 

307 

308 # App data write buffering 

309 self._write_backlog = collections.deque() 

310 self._write_buffer_size = 0 

311 

312 self._waiter = waiter 

313 self._loop = loop 

314 self._set_app_protocol(app_protocol) 

315 self._app_transport = None 

316 self._app_transport_created = False 

317 # transport, ex: SelectorSocketTransport 

318 self._transport = None 

319 self._ssl_handshake_timeout = ssl_handshake_timeout 

320 self._ssl_shutdown_timeout = ssl_shutdown_timeout 

321 # SSL and state machine 

322 self._incoming = ssl.MemoryBIO() 

323 self._outgoing = ssl.MemoryBIO() 

324 self._state = SSLProtocolState.UNWRAPPED 

325 self._conn_lost = 0 # Set when connection_lost called 

326 if call_connection_made: 

327 self._app_state = AppProtocolState.STATE_INIT 

328 else: 

329 self._app_state = AppProtocolState.STATE_CON_MADE 

330 self._sslobj = self._sslcontext.wrap_bio( 

331 self._incoming, self._outgoing, 

332 server_side=self._server_side, 

333 server_hostname=self._server_hostname) 

334 

335 # Flow Control 

336 

337 self._ssl_writing_paused = False 

338 

339 self._app_reading_paused = False 

340 

341 self._ssl_reading_paused = False 

342 self._incoming_high_water = 0 

343 self._incoming_low_water = 0 

344 self._set_read_buffer_limits() 

345 self._eof_received = False 

346 

347 self._app_writing_paused = False 

348 self._outgoing_high_water = 0 

349 self._outgoing_low_water = 0 

350 self._set_write_buffer_limits() 

351 self._get_app_transport() 

352 

353 def _set_app_protocol(self, app_protocol): 

354 self._app_protocol = app_protocol 

355 # Make fast hasattr check first 

356 if (hasattr(app_protocol, 'get_buffer') and 

357 isinstance(app_protocol, protocols.BufferedProtocol)): 

358 self._app_protocol_get_buffer = app_protocol.get_buffer 

359 self._app_protocol_buffer_updated = app_protocol.buffer_updated 

360 self._app_protocol_is_buffer = True 

361 else: 

362 self._app_protocol_is_buffer = False 

363 

364 def _wakeup_waiter(self, exc=None): 

365 if self._waiter is None: 

366 return 

367 if not self._waiter.cancelled(): 

368 if exc is not None: 

369 self._waiter.set_exception(exc) 

370 else: 

371 self._waiter.set_result(None) 

372 self._waiter = None 

373 

374 def _get_app_transport(self): 

375 if self._app_transport is None: 

376 if self._app_transport_created: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true

377 raise RuntimeError('Creating _SSLProtocolTransport twice') 

378 self._app_transport = _SSLProtocolTransport(self._loop, self) 

379 self._app_transport_created = True 

380 return self._app_transport 

381 

382 def _is_transport_closing(self): 

383 return self._transport is not None and self._transport.is_closing() 

384 

385 def connection_made(self, transport): 

386 """Called when the low-level connection is made. 

387 

388 Start the SSL handshake. 

389 """ 

390 self._transport = transport 

391 self._start_handshake() 

392 

393 def connection_lost(self, exc): 

394 """Called when the low-level connection is lost or closed. 

395 

396 The argument is an exception object or None (the latter 

397 meaning a regular EOF is received or the connection was 

398 aborted or closed). 

399 """ 

400 self._write_backlog.clear() 

401 self._outgoing.read() 

402 self._conn_lost += 1 

403 

404 # Just mark the app transport as closed so that its __dealloc__ 

405 # doesn't complain. 

406 if self._app_transport is not None: 406 ↛ 409line 406 didn't jump to line 409 because the condition on line 406 was always true

407 self._app_transport._closed = True 

408 

409 if self._state != SSLProtocolState.DO_HANDSHAKE: 

410 if ( 

411 self._app_state == AppProtocolState.STATE_CON_MADE or 

412 self._app_state == AppProtocolState.STATE_EOF 

413 ): 

414 self._app_state = AppProtocolState.STATE_CON_LOST 

415 self._loop.call_soon(self._app_protocol.connection_lost, exc) 

416 self._set_state(SSLProtocolState.UNWRAPPED) 

417 self._transport = None 

418 self._app_transport = None 

419 self._app_protocol = None 

420 self._wakeup_waiter(exc) 

421 

422 if self._shutdown_timeout_handle: 

423 self._shutdown_timeout_handle.cancel() 

424 self._shutdown_timeout_handle = None 

425 if self._handshake_timeout_handle: 

426 self._handshake_timeout_handle.cancel() 

427 self._handshake_timeout_handle = None 

428 

429 def get_buffer(self, n): 

430 want = n 

431 if want <= 0 or want > self.max_size: 

432 want = self.max_size 

433 if len(self._ssl_buffer) < want: 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true

434 self._ssl_buffer = bytearray(want) 

435 self._ssl_buffer_view = memoryview(self._ssl_buffer) 

436 return self._ssl_buffer_view 

437 

438 def buffer_updated(self, nbytes): 

439 self._incoming.write(self._ssl_buffer_view[:nbytes]) 

440 

441 if self._state == SSLProtocolState.DO_HANDSHAKE: 

442 self._do_handshake() 

443 

444 elif self._state == SSLProtocolState.WRAPPED: 

445 self._do_read() 

446 

447 elif self._state == SSLProtocolState.FLUSHING: 447 ↛ 448line 447 didn't jump to line 448 because the condition on line 447 was never true

448 self._do_flush() 

449 

450 elif self._state == SSLProtocolState.SHUTDOWN: 450 ↛ exitline 450 didn't return from function 'buffer_updated' because the condition on line 450 was always true

451 self._do_shutdown() 

452 

453 def eof_received(self): 

454 """Called when the other end of the low-level stream 

455 is half-closed. 

456 

457 If this returns a false value (including None), the transport 

458 will close itself. If it returns a true value, closing the 

459 transport is up to the protocol. 

460 """ 

461 self._eof_received = True 

462 try: 

463 if self._loop.get_debug(): 463 ↛ 464line 463 didn't jump to line 464 because the condition on line 463 was never true

464 logger.debug("%r received EOF", self) 

465 

466 if self._state == SSLProtocolState.DO_HANDSHAKE: 

467 self._on_handshake_complete(ConnectionResetError) 

468 

469 elif self._state == SSLProtocolState.WRAPPED: 

470 self._set_state(SSLProtocolState.FLUSHING) 

471 if self._app_reading_paused: 

472 return True 

473 else: 

474 self._do_flush() 

475 

476 elif self._state == SSLProtocolState.FLUSHING: 476 ↛ 477line 476 didn't jump to line 477 because the condition on line 476 was never true

477 self._do_write() 

478 self._set_state(SSLProtocolState.SHUTDOWN) 

479 self._do_shutdown() 

480 

481 elif self._state == SSLProtocolState.SHUTDOWN: 481 ↛ exitline 481 didn't return from function 'eof_received' because the condition on line 481 was always true

482 self._do_shutdown() 

483 

484 except Exception: 

485 self._transport.close() 

486 raise 

487 

488 def _get_extra_info(self, name, default=None): 

489 if name in self._extra: 

490 return self._extra[name] 

491 elif self._transport is not None: 

492 return self._transport.get_extra_info(name, default) 

493 else: 

494 return default 

495 

496 def _set_state(self, new_state): 

497 allowed = False 

498 

499 if new_state == SSLProtocolState.UNWRAPPED: 

500 allowed = True 

501 

502 elif ( 

503 self._state == SSLProtocolState.UNWRAPPED and 

504 new_state == SSLProtocolState.DO_HANDSHAKE 

505 ): 

506 allowed = True 

507 

508 elif ( 

509 self._state == SSLProtocolState.DO_HANDSHAKE and 

510 new_state == SSLProtocolState.WRAPPED 

511 ): 

512 allowed = True 

513 

514 elif ( 

515 self._state == SSLProtocolState.WRAPPED and 

516 new_state == SSLProtocolState.FLUSHING 

517 ): 

518 allowed = True 

519 

520 elif ( 520 ↛ 526line 520 didn't jump to line 526 because the condition on line 520 was always true

521 self._state == SSLProtocolState.FLUSHING and 

522 new_state == SSLProtocolState.SHUTDOWN 

523 ): 

524 allowed = True 

525 

526 if allowed: 526 ↛ 530line 526 didn't jump to line 530 because the condition on line 526 was always true

527 self._state = new_state 

528 

529 else: 

530 raise RuntimeError( 

531 'cannot switch state from {} to {}'.format( 

532 self._state, new_state)) 

533 

534 # Handshake flow 

535 

536 def _start_handshake(self): 

537 if self._loop.get_debug(): 537 ↛ 538line 537 didn't jump to line 538 because the condition on line 537 was never true

538 logger.debug("%r starts SSL handshake", self) 

539 self._handshake_start_time = self._loop.time() 

540 else: 

541 self._handshake_start_time = None 

542 

543 self._set_state(SSLProtocolState.DO_HANDSHAKE) 

544 

545 # start handshake timeout count down 

546 self._handshake_timeout_handle = \ 

547 self._loop.call_later(self._ssl_handshake_timeout, 

548 self._check_handshake_timeout) 

549 

550 self._do_handshake() 

551 

552 def _check_handshake_timeout(self): 

553 if self._state == SSLProtocolState.DO_HANDSHAKE: 553 ↛ exitline 553 didn't return from function '_check_handshake_timeout' because the condition on line 553 was always true

554 msg = ( 

555 f"SSL handshake is taking longer than " 

556 f"{self._ssl_handshake_timeout} seconds: " 

557 f"aborting the connection" 

558 ) 

559 self._fatal_error(ConnectionAbortedError(msg)) 

560 

561 def _do_handshake(self): 

562 try: 

563 self._sslobj.do_handshake() 

564 except SSLAgainErrors: 

565 self._process_outgoing() 

566 except ssl.SSLError as exc: 

567 self._on_handshake_complete(exc) 

568 else: 

569 self._on_handshake_complete(None) 

570 

571 def _on_handshake_complete(self, handshake_exc): 

572 if self._handshake_timeout_handle is not None: 572 ↛ 576line 572 didn't jump to line 576 because the condition on line 572 was always true

573 self._handshake_timeout_handle.cancel() 

574 self._handshake_timeout_handle = None 

575 

576 sslobj = self._sslobj 

577 try: 

578 if handshake_exc is None: 

579 self._set_state(SSLProtocolState.WRAPPED) 

580 else: 

581 raise handshake_exc 

582 

583 peercert = sslobj.getpeercert() 

584 except Exception as exc: 

585 handshake_exc = None 

586 self._set_state(SSLProtocolState.UNWRAPPED) 

587 if isinstance(exc, ssl.CertificateError): 

588 msg = 'SSL handshake failed on verifying the certificate' 

589 else: 

590 msg = 'SSL handshake failed' 

591 self._fatal_error(exc, msg) 

592 self._wakeup_waiter(exc) 

593 return 

594 

595 if self._loop.get_debug(): 595 ↛ 596line 595 didn't jump to line 596 because the condition on line 595 was never true

596 dt = self._loop.time() - self._handshake_start_time 

597 logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3) 

598 

599 # Add extra info that becomes available after handshake. 

600 self._extra.update(peercert=peercert, 

601 cipher=sslobj.cipher(), 

602 compression=sslobj.compression(), 

603 ssl_object=sslobj) 

604 if self._app_state == AppProtocolState.STATE_INIT: 

605 self._app_state = AppProtocolState.STATE_CON_MADE 

606 self._app_protocol.connection_made(self._get_app_transport()) 

607 self._wakeup_waiter() 

608 self._do_read() 

609 

610 # Shutdown flow 

611 

612 def _start_shutdown(self): 

613 if ( 

614 self._state in ( 

615 SSLProtocolState.FLUSHING, 

616 SSLProtocolState.SHUTDOWN, 

617 SSLProtocolState.UNWRAPPED 

618 ) 

619 ): 

620 return 

621 if self._app_transport is not None: 621 ↛ 623line 621 didn't jump to line 623 because the condition on line 621 was always true

622 self._app_transport._closed = True 

623 if self._state == SSLProtocolState.DO_HANDSHAKE: 

624 self._abort(None) 

625 else: 

626 self._set_state(SSLProtocolState.FLUSHING) 

627 self._shutdown_timeout_handle = self._loop.call_later( 

628 self._ssl_shutdown_timeout, 

629 self._check_shutdown_timeout 

630 ) 

631 self._do_flush() 

632 

633 def _check_shutdown_timeout(self): 

634 if ( 

635 self._state in ( 

636 SSLProtocolState.FLUSHING, 

637 SSLProtocolState.SHUTDOWN 

638 ) 

639 ): 

640 self._transport._force_close( 

641 exceptions.TimeoutError('SSL shutdown timed out')) 

642 

643 def _do_flush(self): 

644 self._do_read() 

645 self._set_state(SSLProtocolState.SHUTDOWN) 

646 self._do_shutdown() 

647 

648 def _do_shutdown(self): 

649 try: 

650 if not self._eof_received: 

651 self._sslobj.unwrap() 

652 except SSLAgainErrors: 

653 self._process_outgoing() 

654 except ssl.SSLError as exc: 

655 self._on_shutdown_complete(exc) 

656 else: 

657 self._process_outgoing() 

658 self._call_eof_received() 

659 self._on_shutdown_complete(None) 

660 

661 def _on_shutdown_complete(self, shutdown_exc): 

662 if self._shutdown_timeout_handle is not None: 

663 self._shutdown_timeout_handle.cancel() 

664 self._shutdown_timeout_handle = None 

665 

666 if shutdown_exc: 666 ↛ 667line 666 didn't jump to line 667 because the condition on line 666 was never true

667 self._fatal_error(shutdown_exc) 

668 else: 

669 self._loop.call_soon(self._transport.close) 

670 

671 def _abort(self, exc): 

672 self._set_state(SSLProtocolState.UNWRAPPED) 

673 if self._transport is not None: 673 ↛ exitline 673 didn't return from function '_abort' because the condition on line 673 was always true

674 self._transport._force_close(exc) 

675 

676 # Outgoing flow 

677 

678 def _write_appdata(self, list_of_data): 

679 if ( 

680 self._state in ( 

681 SSLProtocolState.FLUSHING, 

682 SSLProtocolState.SHUTDOWN, 

683 SSLProtocolState.UNWRAPPED 

684 ) 

685 ): 

686 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 686 ↛ 687line 686 didn't jump to line 687 because the condition on line 686 was never true

687 logger.warning('SSL connection is closed') 

688 self._conn_lost += 1 

689 return 

690 

691 for data in list_of_data: 

692 self._write_backlog.append(data) 

693 self._write_buffer_size += len(data) 

694 

695 try: 

696 if self._state == SSLProtocolState.WRAPPED: 696 ↛ exitline 696 didn't return from function '_write_appdata' because the condition on line 696 was always true

697 self._do_write() 

698 

699 except Exception as ex: 

700 self._fatal_error(ex, 'Fatal error on SSL protocol') 

701 

702 def _do_write(self): 

703 try: 

704 while self._write_backlog: 

705 data = self._write_backlog[0] 

706 count = self._sslobj.write(data) 

707 data_len = len(data) 

708 if count < data_len: 708 ↛ 709line 708 didn't jump to line 709 because the condition on line 708 was never true

709 self._write_backlog[0] = data[count:] 

710 self._write_buffer_size -= count 

711 else: 

712 del self._write_backlog[0] 

713 self._write_buffer_size -= data_len 

714 except SSLAgainErrors: 

715 pass 

716 self._process_outgoing() 

717 

718 def _process_outgoing(self): 

719 if not self._ssl_writing_paused: 

720 data = self._outgoing.read() 

721 if len(data): 

722 self._transport.write(data) 

723 self._control_app_writing() 

724 

725 # Incoming flow 

726 

727 def _do_read(self): 

728 if ( 728 ↛ 734line 728 didn't jump to line 734 because the condition on line 728 was never true

729 self._state not in ( 

730 SSLProtocolState.WRAPPED, 

731 SSLProtocolState.FLUSHING, 

732 ) 

733 ): 

734 return 

735 try: 

736 if not self._app_reading_paused: 

737 if self._app_protocol_is_buffer: 

738 self._do_read__buffered() 

739 else: 

740 self._do_read__copied() 

741 if self._write_backlog: 

742 self._do_write() 

743 else: 

744 self._process_outgoing() 

745 self._control_ssl_reading() 

746 except Exception as ex: 

747 self._fatal_error(ex, 'Fatal error on SSL protocol') 

748 

749 def _do_read__buffered(self): 

750 offset = 0 

751 count = 1 

752 

753 buf = self._app_protocol_get_buffer(self._get_read_buffer_size()) 

754 wants = len(buf) 

755 

756 try: 

757 count = self._sslobj.read(wants, buf) 

758 

759 if count > 0: 759 ↛ 771line 759 didn't jump to line 771 because the condition on line 759 was always true

760 offset = count 

761 while offset < wants: 

762 count = self._sslobj.read(wants - offset, buf[offset:]) 

763 if count > 0: 763 ↛ 766line 763 didn't jump to line 766 because the condition on line 763 was always true

764 offset += count 

765 else: 

766 break 

767 else: 

768 self._loop.call_soon(self._do_read) 

769 except SSLAgainErrors: 

770 pass 

771 if offset > 0: 

772 self._app_protocol_buffer_updated(offset) 

773 if not count: 773 ↛ 775line 773 didn't jump to line 775 because the condition on line 773 was never true

774 # close_notify 

775 self._call_eof_received() 

776 self._start_shutdown() 

777 

778 def _do_read__copied(self): 

779 chunk = b'1' 

780 zero = True 

781 one = False 

782 

783 try: 

784 while True: 

785 chunk = self._sslobj.read(self.max_size) 

786 if not chunk: 

787 break 

788 if zero: 

789 zero = False 

790 one = True 

791 first = chunk 

792 elif one: 

793 one = False 

794 data = [first, chunk] 

795 else: 

796 data.append(chunk) 

797 except SSLAgainErrors: 

798 pass 

799 if one: 

800 self._app_protocol.data_received(first) 

801 elif not zero: 

802 self._app_protocol.data_received(b''.join(data)) 

803 if not chunk: 

804 # close_notify 

805 self._call_eof_received() 

806 self._start_shutdown() 

807 

808 def _call_eof_received(self): 

809 try: 

810 if self._app_state == AppProtocolState.STATE_CON_MADE: 

811 self._app_state = AppProtocolState.STATE_EOF 

812 keep_open = self._app_protocol.eof_received() 

813 if keep_open: 813 ↛ 814line 813 didn't jump to line 814 because the condition on line 813 was never true

814 logger.warning('returning true from eof_received() ' 

815 'has no effect when using ssl') 

816 except (KeyboardInterrupt, SystemExit): 

817 raise 

818 except BaseException as ex: 

819 self._fatal_error(ex, 'Error calling eof_received()') 

820 

821 # Flow control for writes from APP socket 

822 

823 def _control_app_writing(self): 

824 size = self._get_write_buffer_size() 

825 if size >= self._outgoing_high_water and not self._app_writing_paused: 

826 self._app_writing_paused = True 

827 try: 

828 self._app_protocol.pause_writing() 

829 except (KeyboardInterrupt, SystemExit): 

830 raise 

831 except BaseException as exc: 

832 self._loop.call_exception_handler({ 

833 'message': 'protocol.pause_writing() failed', 

834 'exception': exc, 

835 'transport': self._app_transport, 

836 'protocol': self, 

837 }) 

838 elif size <= self._outgoing_low_water and self._app_writing_paused: 

839 self._app_writing_paused = False 

840 try: 

841 self._app_protocol.resume_writing() 

842 except (KeyboardInterrupt, SystemExit): 

843 raise 

844 except BaseException as exc: 

845 self._loop.call_exception_handler({ 

846 'message': 'protocol.resume_writing() failed', 

847 'exception': exc, 

848 'transport': self._app_transport, 

849 'protocol': self, 

850 }) 

851 

852 def _get_write_buffer_size(self): 

853 return self._outgoing.pending + self._write_buffer_size 

854 

855 def _set_write_buffer_limits(self, high=None, low=None): 

856 high, low = add_flowcontrol_defaults( 

857 high, low, constants.FLOW_CONTROL_HIGH_WATER_SSL_WRITE) 

858 self._outgoing_high_water = high 

859 self._outgoing_low_water = low 

860 

861 # Flow control for reads to APP socket 

862 

863 def _pause_reading(self): 

864 self._app_reading_paused = True 

865 

866 def _resume_reading(self): 

867 if self._app_reading_paused: 867 ↛ exitline 867 didn't return from function '_resume_reading' because the condition on line 867 was always true

868 self._app_reading_paused = False 

869 

870 def resume(): 

871 if self._state == SSLProtocolState.WRAPPED: 

872 self._do_read() 

873 elif self._state == SSLProtocolState.FLUSHING: 

874 self._do_flush() 

875 elif self._state == SSLProtocolState.SHUTDOWN: 875 ↛ exitline 875 didn't return from function 'resume' because the condition on line 875 was always true

876 self._do_shutdown() 

877 self._loop.call_soon(resume) 

878 

879 # Flow control for reads from SSL socket 

880 

881 def _control_ssl_reading(self): 

882 size = self._get_read_buffer_size() 

883 if size >= self._incoming_high_water and not self._ssl_reading_paused: 883 ↛ 884line 883 didn't jump to line 884 because the condition on line 883 was never true

884 self._ssl_reading_paused = True 

885 self._transport.pause_reading() 

886 elif size <= self._incoming_low_water and self._ssl_reading_paused: 886 ↛ 887line 886 didn't jump to line 887 because the condition on line 886 was never true

887 self._ssl_reading_paused = False 

888 self._transport.resume_reading() 

889 

890 def _set_read_buffer_limits(self, high=None, low=None): 

891 high, low = add_flowcontrol_defaults( 

892 high, low, constants.FLOW_CONTROL_HIGH_WATER_SSL_READ) 

893 self._incoming_high_water = high 

894 self._incoming_low_water = low 

895 

896 def _get_read_buffer_size(self): 

897 return self._incoming.pending 

898 

899 # Flow control for writes to SSL socket 

900 

901 def pause_writing(self): 

902 """Called when the low-level transport's buffer goes over 

903 the high-water mark. 

904 """ 

905 assert not self._ssl_writing_paused 

906 self._ssl_writing_paused = True 

907 

908 def resume_writing(self): 

909 """Called when the low-level transport's buffer drains below 

910 the low-water mark. 

911 """ 

912 assert self._ssl_writing_paused 

913 self._ssl_writing_paused = False 

914 self._process_outgoing() 

915 

916 def _fatal_error(self, exc, message='Fatal error on transport'): 

917 if self._transport: 

918 self._transport._force_close(exc) 

919 

920 if isinstance(exc, OSError): 

921 if self._loop.get_debug(): 921 ↛ 922line 921 didn't jump to line 922 because the condition on line 921 was never true

922 logger.debug("%r: %s", self, message, exc_info=True) 

923 elif not isinstance(exc, exceptions.CancelledError): 923 ↛ exitline 923 didn't return from function '_fatal_error' because the condition on line 923 was always true

924 self._loop.call_exception_handler({ 

925 'message': message, 

926 'exception': exc, 

927 'transport': self._transport, 

928 'protocol': self, 

929 })