Coverage for Lib/asyncio/futures.py: 96%

244 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-15 02:02 +0000

1"""A Future class similar to the one in PEP 3148.""" 

2 

3__all__ = ( 

4 'Future', 'wrap_future', 'isfuture', 

5 'future_add_to_awaited_by', 'future_discard_from_awaited_by', 

6) 

7 

8import concurrent.futures 

9import contextvars 

10import logging 

11import sys 

12from types import GenericAlias 

13 

14from . import base_futures 

15from . import events 

16from . import exceptions 

17from . import format_helpers 

18 

19 

20isfuture = base_futures.isfuture 

21 

22 

23_PENDING = base_futures._PENDING 

24_CANCELLED = base_futures._CANCELLED 

25_FINISHED = base_futures._FINISHED 

26 

27 

28STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging 

29 

30 

31class Future: 

32 """This class is *almost* compatible with concurrent.futures.Future. 

33 

34 Differences: 

35 

36 - This class is not thread-safe. 

37 

38 - result() and exception() do not take a timeout argument and 

39 raise an exception when the future isn't done yet. 

40 

41 - Callbacks registered with add_done_callback() are always called 

42 via the event loop's call_soon(). 

43 

44 - This class is not compatible with the wait() and as_completed() 

45 methods in the concurrent.futures package. 

46 

47 """ 

48 

49 # Class variables serving as defaults for instance variables. 

50 _state = _PENDING 

51 _result = None 

52 _exception = None 

53 _loop = None 

54 _source_traceback = None 

55 _cancel_message = None 

56 # A saved CancelledError for later chaining as an exception context. 

57 _cancelled_exc = None 

58 

59 # This field is used for a dual purpose: 

60 # - Its presence is a marker to declare that a class implements 

61 # the Future protocol (i.e. is intended to be duck-type compatible). 

62 # The value must also be not-None, to enable a subclass to declare 

63 # that it is not compatible by setting this to None. 

64 # - It is set by __iter__() below so that Task.__step() can tell 

65 # the difference between 

66 # `await Future()` or `yield from Future()` (correct) vs. 

67 # `yield Future()` (incorrect). 

68 _asyncio_future_blocking = False 

69 

70 # Used by the capture_call_stack() API. 

71 __asyncio_awaited_by = None 

72 

73 __log_traceback = False 

74 

75 def __init__(self, *, loop=None): 

76 """Initialize the future. 

77 

78 The optional event_loop argument allows explicitly setting the event 

79 loop object used by the future. If it's not provided, the future uses 

80 the default event loop. 

81 """ 

82 if loop is None: 

83 self._loop = events.get_event_loop() 

84 else: 

85 self._loop = loop 

86 self._callbacks = [] 

87 if self._loop.get_debug(): 

88 self._source_traceback = format_helpers.extract_stack( 

89 sys._getframe(1)) 

90 

91 def __repr__(self): 

92 return base_futures._future_repr(self) 

93 

94 def __del__(self): 

95 if not self.__log_traceback: 

96 # set_exception() was not called, or result() or exception() 

97 # has consumed the exception 

98 return 

99 exc = self._exception 

100 context = { 

101 'message': 

102 f'{self.__class__.__name__} exception was never retrieved', 

103 'exception': exc, 

104 'future': self, 

105 } 

106 if self._source_traceback: 

107 context['source_traceback'] = self._source_traceback 

108 self._loop.call_exception_handler(context) 

109 

110 __class_getitem__ = classmethod(GenericAlias) 

111 

112 @property 

113 def _log_traceback(self): 

114 return self.__log_traceback 

115 

116 @_log_traceback.setter 

117 def _log_traceback(self, val): 

118 if val: 

119 raise ValueError('_log_traceback can only be set to False') 

120 self.__log_traceback = False 

121 

122 @property 

123 def _asyncio_awaited_by(self): 

124 if self.__asyncio_awaited_by is None: 

125 return None 

126 return frozenset(self.__asyncio_awaited_by) 

127 

128 def get_loop(self): 

129 """Return the event loop the Future is bound to.""" 

130 loop = self._loop 

131 if loop is None: 

132 raise RuntimeError("Future object is not initialized.") 

133 return loop 

134 

135 def _make_cancelled_error(self): 

136 """Create the CancelledError to raise if the Future is cancelled. 

137 

138 This should only be called once when handling a cancellation since 

139 it erases the saved context exception value. 

140 """ 

141 if self._cancelled_exc is not None: 

142 exc = self._cancelled_exc 

143 self._cancelled_exc = None 

144 return exc 

145 

146 if self._cancel_message is None: 

147 exc = exceptions.CancelledError() 

148 else: 

149 exc = exceptions.CancelledError(self._cancel_message) 

150 return exc 

151 

152 def cancel(self, msg=None): 

153 """Cancel the future and schedule callbacks. 

154 

155 If the future is already done or cancelled, return False. Otherwise, 

156 change the future's state to cancelled, schedule the callbacks and 

157 return True. 

158 """ 

159 self.__log_traceback = False 

160 if self._state != _PENDING: 

161 return False 

162 self._state = _CANCELLED 

163 self._cancel_message = msg 

164 self.__schedule_callbacks() 

165 return True 

166 

167 def __schedule_callbacks(self): 

168 """Internal: Ask the event loop to call all callbacks. 

169 

170 The callbacks are scheduled to be called as soon as possible. Also 

171 clears the callback list. 

172 """ 

173 callbacks = self._callbacks[:] 

174 if not callbacks: 

175 return 

176 

177 self._callbacks[:] = [] 

178 for callback, ctx in callbacks: 

179 self._loop.call_soon(callback, self, context=ctx) 

180 

181 def cancelled(self): 

182 """Return True if the future was cancelled.""" 

183 return self._state == _CANCELLED 

184 

185 # Don't implement running(); see http://bugs.python.org/issue18699 

186 

187 def done(self): 

188 """Return True if the future is done. 

189 

190 Done means either that a result / exception are available, or that the 

191 future was cancelled. 

192 """ 

193 return self._state != _PENDING 

194 

195 def result(self): 

196 """Return the result this future represents. 

197 

198 If the future has been cancelled, raises CancelledError. If the 

199 future's result isn't yet available, raises InvalidStateError. If 

200 the future is done and has an exception set, this exception is raised. 

201 """ 

202 if self._state == _CANCELLED: 

203 raise self._make_cancelled_error() 

204 if self._state != _FINISHED: 

205 raise exceptions.InvalidStateError('Result is not ready.') 

206 self.__log_traceback = False 

207 if self._exception is not None: 

208 raise self._exception.with_traceback(self._exception_tb) 

209 return self._result 

210 

211 def exception(self): 

212 """Return the exception that was set on this future. 

213 

214 The exception (or None if no exception was set) is returned only if 

215 the future is done. If the future has been cancelled, raises 

216 CancelledError. If the future isn't done yet, raises 

217 InvalidStateError. 

218 """ 

219 if self._state == _CANCELLED: 

220 raise self._make_cancelled_error() 

221 if self._state != _FINISHED: 

222 raise exceptions.InvalidStateError('Exception is not set.') 

223 self.__log_traceback = False 

224 return self._exception 

225 

226 def add_done_callback(self, fn, *, context=None): 

227 """Add a callback to be run when the future becomes done. 

228 

229 The callback is called with a single argument - the future object. If 

230 the future is already done when this is called, the callback is 

231 scheduled with call_soon. 

232 """ 

233 if self._state != _PENDING: 

234 self._loop.call_soon(fn, self, context=context) 

235 else: 

236 if context is None: 

237 context = contextvars.copy_context() 

238 self._callbacks.append((fn, context)) 

239 

240 # New method not in PEP 3148. 

241 

242 def remove_done_callback(self, fn): 

243 """Remove all instances of a callback from the "call when done" list. 

244 

245 Returns the number of callbacks removed. 

246 """ 

247 filtered_callbacks = [(f, ctx) 

248 for (f, ctx) in self._callbacks 

249 if f != fn] 

250 removed_count = len(self._callbacks) - len(filtered_callbacks) 

251 if removed_count: 

252 self._callbacks[:] = filtered_callbacks 

253 return removed_count 

254 

255 # So-called internal methods (note: no set_running_or_notify_cancel()). 

256 

257 def set_result(self, result): 

258 """Mark the future done and set its result. 

259 

260 If the future is already done when this method is called, raises 

261 InvalidStateError. 

262 """ 

263 if self._state != _PENDING: 

264 raise exceptions.InvalidStateError(f'{self._state}: {self!r}') 

265 self._result = result 

266 self._state = _FINISHED 

267 self.__schedule_callbacks() 

268 

269 def set_exception(self, exception): 

270 """Mark the future done and set an exception. 

271 

272 If the future is already done when this method is called, raises 

273 InvalidStateError. 

274 """ 

275 if self._state != _PENDING: 

276 raise exceptions.InvalidStateError(f'{self._state}: {self!r}') 

277 if isinstance(exception, type): 

278 exception = exception() 

279 if isinstance(exception, StopIteration): 

280 new_exc = RuntimeError("StopIteration interacts badly with " 

281 "generators and cannot be raised into a " 

282 "Future") 

283 new_exc.__cause__ = exception 

284 new_exc.__context__ = exception 

285 exception = new_exc 

286 self._exception = exception 

287 self._exception_tb = exception.__traceback__ 

288 self._state = _FINISHED 

289 self.__schedule_callbacks() 

290 self.__log_traceback = True 

291 

292 def __await__(self): 

293 if not self.done(): 

294 self._asyncio_future_blocking = True 

295 yield self # This tells Task to wait for completion. 

296 if not self.done(): 

297 raise RuntimeError("await wasn't used with future") 

298 return self.result() # May raise too. 

299 

300 __iter__ = __await__ # make compatible with 'yield from'. 

301 

302 

303# Needed for testing purposes. 

304_PyFuture = Future 

305 

306 

307def _get_loop(fut): 

308 # Tries to call Future.get_loop() if it's available. 

309 # Otherwise fallbacks to using the old '_loop' property. 

310 try: 

311 get_loop = fut.get_loop 

312 except AttributeError: 

313 pass 

314 else: 

315 return get_loop() 

316 return fut._loop 

317 

318 

319def _set_result_unless_cancelled(fut, result): 

320 """Helper setting the result only if the future was not cancelled.""" 

321 if fut.cancelled(): 

322 return 

323 fut.set_result(result) 

324 

325 

326def _convert_future_exc(exc): 

327 exc_class = type(exc) 

328 if exc_class is concurrent.futures.CancelledError: 

329 return exceptions.CancelledError(*exc.args).with_traceback(exc.__traceback__) 

330 elif exc_class is concurrent.futures.InvalidStateError: 

331 return exceptions.InvalidStateError(*exc.args).with_traceback(exc.__traceback__) 

332 else: 

333 return exc 

334 

335 

336def _set_concurrent_future_state(concurrent, source): 

337 """Copy state from a future to a concurrent.futures.Future.""" 

338 assert source.done() 

339 if source.cancelled(): 

340 concurrent.cancel() 

341 if not concurrent.set_running_or_notify_cancel(): 

342 return 

343 exception = source.exception() 

344 if exception is not None: 

345 concurrent.set_exception(_convert_future_exc(exception)) 

346 else: 

347 result = source.result() 

348 concurrent.set_result(result) 

349 

350 

351def _copy_future_state(source, dest): 

352 """Internal helper to copy state from another Future. 

353 

354 The other Future may be a concurrent.futures.Future. 

355 """ 

356 assert source.done() 

357 if dest.cancelled(): 

358 return 

359 assert not dest.done() 

360 if source.cancelled(): 

361 dest.cancel() 

362 else: 

363 exception = source.exception() 

364 if exception is not None: 

365 dest.set_exception(_convert_future_exc(exception)) 

366 else: 

367 result = source.result() 

368 dest.set_result(result) 

369 

370 

371def _chain_future(source, destination): 

372 """Chain two futures so that when one completes, so does the other. 

373 

374 The result (or exception) of source will be copied to destination. 

375 If destination is cancelled, source gets cancelled too. 

376 Compatible with both asyncio.Future and concurrent.futures.Future. 

377 """ 

378 if not isfuture(source) and not isinstance(source, 378 ↛ 380line 378 didn't jump to line 380 because the condition on line 378 was never true

379 concurrent.futures.Future): 

380 raise TypeError('A future is required for source argument') 

381 if not isfuture(destination) and not isinstance(destination, 381 ↛ 383line 381 didn't jump to line 383 because the condition on line 381 was never true

382 concurrent.futures.Future): 

383 raise TypeError('A future is required for destination argument') 

384 source_loop = _get_loop(source) if isfuture(source) else None 

385 dest_loop = _get_loop(destination) if isfuture(destination) else None 

386 

387 def _set_state(future, other): 

388 if isfuture(future): 

389 _copy_future_state(other, future) 

390 else: 

391 _set_concurrent_future_state(future, other) 

392 

393 def _call_check_cancel(destination): 

394 if destination.cancelled(): 

395 if source_loop is None or source_loop is dest_loop: 

396 source.cancel() 

397 else: 

398 source_loop.call_soon_threadsafe(source.cancel) 

399 

400 def _call_set_state(source): 

401 if (destination.cancelled() and 401 ↛ 403line 401 didn't jump to line 403 because the condition on line 401 was never true

402 dest_loop is not None and dest_loop.is_closed()): 

403 return 

404 if dest_loop is None or dest_loop is source_loop: 

405 _set_state(destination, source) 

406 else: 

407 if dest_loop.is_closed(): 407 ↛ 408line 407 didn't jump to line 408 because the condition on line 407 was never true

408 return 

409 dest_loop.call_soon_threadsafe(_set_state, destination, source) 

410 

411 destination.add_done_callback(_call_check_cancel) 

412 source.add_done_callback(_call_set_state) 

413 

414 

415def wrap_future(future, *, loop=None): 

416 """Wrap concurrent.futures.Future object.""" 

417 if isfuture(future): 

418 return future 

419 assert isinstance(future, concurrent.futures.Future), \ 

420 f'concurrent.futures.Future is expected, got {future!r}' 

421 if loop is None: 

422 loop = events.get_event_loop() 

423 new_future = loop.create_future() 

424 _chain_future(future, new_future) 

425 return new_future 

426 

427 

428def future_add_to_awaited_by(fut, waiter, /): 

429 """Record that `fut` is awaited on by `waiter`.""" 

430 # For the sake of keeping the implementation minimal and assuming 

431 # that most of asyncio users use the built-in Futures and Tasks 

432 # (or their subclasses), we only support native Future objects 

433 # and their subclasses. 

434 # 

435 # Longer version: tracking requires storing the caller-callee 

436 # dependency somewhere. One obvious choice is to store that 

437 # information right in the future itself in a dedicated attribute. 

438 # This means that we'd have to require all duck-type compatible 

439 # futures to implement a specific attribute used by asyncio for 

440 # the book keeping. Another solution would be to store that in 

441 # a global dictionary. The downside here is that that would create 

442 # strong references and any scenario where the "add" call isn't 

443 # followed by a "discard" call would lead to a memory leak. 

444 # Using WeakDict would resolve that issue, but would complicate 

445 # the C code (_asynciomodule.c). The bottom line here is that 

446 # it's not clear that all this work would be worth the effort. 

447 # 

448 # Note that there's an accelerated version of this function 

449 # shadowing this implementation later in this file. 

450 if isinstance(fut, _PyFuture) and isinstance(waiter, _PyFuture): 

451 if fut._Future__asyncio_awaited_by is None: 

452 fut._Future__asyncio_awaited_by = set() 

453 fut._Future__asyncio_awaited_by.add(waiter) 

454 

455 

456def future_discard_from_awaited_by(fut, waiter, /): 

457 """Record that `fut` is no longer awaited on by `waiter`.""" 

458 # See the comment in "future_add_to_awaited_by()" body for 

459 # details on implementation. 

460 # 

461 # Note that there's an accelerated version of this function 

462 # shadowing this implementation later in this file. 

463 if isinstance(fut, _PyFuture) and isinstance(waiter, _PyFuture): 

464 if fut._Future__asyncio_awaited_by is not None: 464 ↛ exitline 464 didn't return from function 'future_discard_from_awaited_by' because the condition on line 464 was always true

465 fut._Future__asyncio_awaited_by.discard(waiter) 

466 

467 

468_py_future_add_to_awaited_by = future_add_to_awaited_by 

469_py_future_discard_from_awaited_by = future_discard_from_awaited_by 

470 

471try: 

472 import _asyncio 

473except ImportError: 

474 pass 

475else: 

476 # _CFuture is needed for tests. 

477 Future = _CFuture = _asyncio.Future 

478 future_add_to_awaited_by = _asyncio.future_add_to_awaited_by 

479 future_discard_from_awaited_by = _asyncio.future_discard_from_awaited_by 

480 _c_future_add_to_awaited_by = future_add_to_awaited_by 

481 _c_future_discard_from_awaited_by = future_discard_from_awaited_by