Coverage for Lib/asyncio/futures.py: 96%

243 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-23 01:21 +0000

1"""A Future class similar to the one in PEP 3148.""" 

2 

3__all__ = ( 

4 'Future', 'wrap_future', 'isfuture', 

5 'future_add_to_awaited_by', 'future_discard_from_awaited_by', 

6) 

7 

8import concurrent.futures 

9import contextvars 

10import logging 

11import sys 

12from types import GenericAlias 

13 

14from . import base_futures 

15from . import events 

16from . import exceptions 

17from . import format_helpers 

18 

19 

20isfuture = base_futures.isfuture 

21 

22 

23_PENDING = base_futures._PENDING 

24_CANCELLED = base_futures._CANCELLED 

25_FINISHED = base_futures._FINISHED 

26 

27 

28STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging 

29 

30 

31class Future: 

32 """This class is *almost* compatible with concurrent.futures.Future. 

33 

34 Differences: 

35 

36 - This class is not thread-safe. 

37 

38 - result() and exception() do not take a timeout argument and 

39 raise an exception when the future isn't done yet. 

40 

41 - Callbacks registered with add_done_callback() are always called 

42 via the event loop's call_soon(). 

43 

44 - This class is not compatible with the wait() and as_completed() 

45 methods in the concurrent.futures package. 

46 

47 """ 

48 

49 # Class variables serving as defaults for instance variables. 

50 _state = _PENDING 

51 _result = None 

52 _exception = None 

53 _loop = None 

54 _source_traceback = None 

55 _cancel_message = None 

56 # A saved CancelledError for later chaining as an exception context. 

57 _cancelled_exc = None 

58 

59 # This field is used for a dual purpose: 

60 # - Its presence is a marker to declare that a class implements 

61 # the Future protocol (i.e. is intended to be duck-type compatible). 

62 # The value must also be not-None, to enable a subclass to declare 

63 # that it is not compatible by setting this to None. 

64 # - It is set by __iter__() below so that Task.__step() can tell 

65 # the difference between 

66 # `await Future()` or `yield from Future()` (correct) vs. 

67 # `yield Future()` (incorrect). 

68 _asyncio_future_blocking = False 

69 

70 # Used by the capture_call_stack() API. 

71 __asyncio_awaited_by = None 

72 

73 __log_traceback = False 

74 

75 def __init__(self, *, loop=None): 

76 """Initialize the future. 

77 

78 The optional event_loop argument allows explicitly setting the event 

79 loop object used by the future. If it's not provided, the future uses 

80 the default event loop. 

81 """ 

82 if loop is None: 

83 self._loop = events.get_event_loop() 

84 else: 

85 self._loop = loop 

86 self._callbacks = [] 

87 if self._loop.get_debug(): 

88 self._source_traceback = format_helpers.extract_stack( 

89 sys._getframe(1)) 

90 

91 def __repr__(self): 

92 return base_futures._future_repr(self) 

93 

94 def __del__(self): 

95 if not self.__log_traceback: 

96 # set_exception() was not called, or result() or exception() 

97 # has consumed the exception 

98 return 

99 exc = self._exception 

100 context = { 

101 'message': 

102 f'{self.__class__.__name__} exception was never retrieved', 

103 'exception': exc, 

104 'future': self, 

105 } 

106 if self._source_traceback: 

107 context['source_traceback'] = self._source_traceback 

108 self._loop.call_exception_handler(context) 

109 

110 __class_getitem__ = classmethod(GenericAlias) 

111 

112 @property 

113 def _log_traceback(self): 

114 return self.__log_traceback 

115 

116 @_log_traceback.setter 

117 def _log_traceback(self, val): 

118 if val: 

119 raise ValueError('_log_traceback can only be set to False') 

120 self.__log_traceback = False 

121 

122 @property 

123 def _asyncio_awaited_by(self): 

124 if self.__asyncio_awaited_by is None: 

125 return None 

126 return frozenset(self.__asyncio_awaited_by) 

127 

128 def get_loop(self): 

129 """Return the event loop the Future is bound to.""" 

130 loop = self._loop 

131 if loop is None: 

132 raise RuntimeError("Future object is not initialized.") 

133 return loop 

134 

135 def _make_cancelled_error(self): 

136 """Create the CancelledError to raise if the Future is cancelled. 

137 

138 This should only be called once when handling a cancellation since 

139 it erases the saved context exception value. 

140 """ 

141 if self._cancelled_exc is not None: 

142 exc = self._cancelled_exc 

143 self._cancelled_exc = None 

144 return exc 

145 

146 if self._cancel_message is None: 

147 exc = exceptions.CancelledError() 

148 else: 

149 exc = exceptions.CancelledError(self._cancel_message) 

150 return exc 

151 

152 def cancel(self, msg=None): 

153 """Cancel the future and schedule callbacks. 

154 

155 If the future is already done or cancelled, return False. Otherwise, 

156 change the future's state to cancelled, schedule the callbacks and 

157 return True. 

158 """ 

159 self.__log_traceback = False 

160 if self._state != _PENDING: 

161 return False 

162 self._state = _CANCELLED 

163 self._cancel_message = msg 

164 self.__schedule_callbacks() 

165 return True 

166 

167 def __schedule_callbacks(self): 

168 """Internal: Ask the event loop to call all callbacks. 

169 

170 The callbacks are scheduled to be called as soon as possible. Also 

171 clears the callback list. 

172 """ 

173 callbacks = self._callbacks[:] 

174 if not callbacks: 

175 return 

176 

177 self._callbacks[:] = [] 

178 for callback, ctx in callbacks: 

179 self._loop.call_soon(callback, self, context=ctx) 

180 

181 def cancelled(self): 

182 """Return True if the future was cancelled.""" 

183 return self._state == _CANCELLED 

184 

185 # Don't implement running(); see http://bugs.python.org/issue18699 

186 

187 def done(self): 

188 """Return True if the future is done. 

189 

190 Done means either that a result / exception are available, or that the 

191 future was cancelled. 

192 """ 

193 return self._state != _PENDING 

194 

195 def result(self): 

196 """Return the result this future represents. 

197 

198 If the future has been cancelled, raises CancelledError. If the 

199 future's result isn't yet available, raises InvalidStateError. If 

200 the future is done and has an exception set, this exception is raised. 

201 """ 

202 if self._state == _CANCELLED: 

203 raise self._make_cancelled_error() 

204 if self._state != _FINISHED: 

205 raise exceptions.InvalidStateError('Result is not ready.') 

206 self.__log_traceback = False 

207 if self._exception is not None: 

208 raise self._exception.with_traceback(self._exception_tb) 

209 return self._result 

210 

211 def exception(self): 

212 """Return the exception that was set on this future. 

213 

214 The exception (or None if no exception was set) is returned only if 

215 the future is done. If the future has been cancelled, raises 

216 CancelledError. If the future isn't done yet, raises 

217 InvalidStateError. 

218 """ 

219 if self._state == _CANCELLED: 

220 raise self._make_cancelled_error() 

221 if self._state != _FINISHED: 

222 raise exceptions.InvalidStateError('Exception is not set.') 

223 self.__log_traceback = False 

224 return self._exception 

225 

226 def add_done_callback(self, fn, *, context=None): 

227 """Add a callback to be run when the future becomes done. 

228 

229 The callback is called with a single argument - the future object. If 

230 the future is already done when this is called, the callback is 

231 scheduled with call_soon. 

232 """ 

233 if self._state != _PENDING: 

234 self._loop.call_soon(fn, self, context=context) 

235 else: 

236 if context is None: 

237 context = contextvars.copy_context() 

238 self._callbacks.append((fn, context)) 

239 

240 # New method not in PEP 3148. 

241 

242 def remove_done_callback(self, fn): 

243 """Remove all instances of a callback from the "call when done" list. 

244 

245 Returns the number of callbacks removed. 

246 """ 

247 filtered_callbacks = [(f, ctx) 

248 for (f, ctx) in self._callbacks 

249 if f != fn] 

250 removed_count = len(self._callbacks) - len(filtered_callbacks) 

251 if removed_count: 

252 self._callbacks[:] = filtered_callbacks 

253 return removed_count 

254 

255 # So-called internal methods (note: no set_running_or_notify_cancel()). 

256 

257 def set_result(self, result): 

258 """Mark the future done and set its result. 

259 

260 If the future is already done when this method is called, raises 

261 InvalidStateError. 

262 """ 

263 if self._state != _PENDING: 

264 raise exceptions.InvalidStateError(f'{self._state}: {self!r}') 

265 self._result = result 

266 self._state = _FINISHED 

267 self.__schedule_callbacks() 

268 

269 def set_exception(self, exception): 

270 """Mark the future done and set an exception. 

271 

272 If the future is already done when this method is called, raises 

273 InvalidStateError. 

274 """ 

275 if self._state != _PENDING: 

276 raise exceptions.InvalidStateError(f'{self._state}: {self!r}') 

277 if isinstance(exception, type): 

278 exception = exception() 

279 if isinstance(exception, StopIteration): 

280 new_exc = RuntimeError("StopIteration interacts badly with " 

281 "generators and cannot be raised into a " 

282 "Future") 

283 new_exc.__cause__ = exception 

284 new_exc.__context__ = exception 

285 exception = new_exc 

286 self._exception = exception 

287 self._exception_tb = exception.__traceback__ 

288 self._state = _FINISHED 

289 self.__schedule_callbacks() 

290 self.__log_traceback = True 

291 

292 def __await__(self): 

293 if not self.done(): 

294 self._asyncio_future_blocking = True 

295 yield self # This tells Task to wait for completion. 

296 if not self.done(): 

297 raise RuntimeError("await wasn't used with future") 

298 return self.result() # May raise too. 

299 

300 __iter__ = __await__ # make compatible with 'yield from'. 

301 

302 

303# Needed for testing purposes. 

304_PyFuture = Future 

305 

306 

307def _get_loop(fut): 

308 # Tries to call Future.get_loop() if it's available. 

309 # Otherwise fallbacks to using the old '_loop' property. 

310 try: 

311 get_loop = fut.get_loop 

312 except AttributeError: 

313 pass 

314 else: 

315 return get_loop() 

316 return fut._loop 

317 

318 

319def _set_result_unless_cancelled(fut, result): 

320 """Helper setting the result only if the future was not cancelled.""" 

321 if fut.cancelled(): 

322 return 

323 fut.set_result(result) 

324 

325 

326def _convert_future_exc(exc): 

327 exc_class = type(exc) 

328 if exc_class is concurrent.futures.CancelledError: 

329 return exceptions.CancelledError(*exc.args).with_traceback(exc.__traceback__) 

330 elif exc_class is concurrent.futures.InvalidStateError: 

331 return exceptions.InvalidStateError(*exc.args).with_traceback(exc.__traceback__) 

332 else: 

333 return exc 

334 

335 

336def _set_concurrent_future_state(concurrent, source): 

337 """Copy state from a future to a concurrent.futures.Future.""" 

338 assert source.done() 

339 if source.cancelled(): 

340 concurrent.cancel() 

341 if not concurrent.set_running_or_notify_cancel(): 

342 return 

343 exception = source.exception() 

344 if exception is not None: 

345 concurrent.set_exception(_convert_future_exc(exception)) 

346 else: 

347 result = source.result() 

348 concurrent.set_result(result) 

349 

350 

351def _copy_future_state(source, dest): 

352 """Internal helper to copy state from another Future. 

353 

354 The other Future must be a concurrent.futures.Future. 

355 """ 

356 if dest.cancelled(): 

357 return 

358 assert not dest.done() 

359 done, cancelled, result, exception = source._get_snapshot() 

360 assert done 

361 if cancelled: 

362 dest.cancel() 

363 elif exception is not None: 

364 dest.set_exception(_convert_future_exc(exception)) 

365 else: 

366 dest.set_result(result) 

367 

368def _chain_future(source, destination): 

369 """Chain two futures so that when one completes, so does the other. 

370 

371 The result (or exception) of source will be copied to destination. 

372 If destination is cancelled, source gets cancelled too. 

373 Compatible with both asyncio.Future and concurrent.futures.Future. 

374 """ 

375 if not isfuture(source) and not isinstance(source, 375 ↛ 377line 375 didn't jump to line 377 because the condition on line 375 was never true

376 concurrent.futures.Future): 

377 raise TypeError('A future is required for source argument') 

378 if not isfuture(destination) and not isinstance(destination, 378 ↛ 380line 378 didn't jump to line 380 because the condition on line 378 was never true

379 concurrent.futures.Future): 

380 raise TypeError('A future is required for destination argument') 

381 source_loop = _get_loop(source) if isfuture(source) else None 

382 dest_loop = _get_loop(destination) if isfuture(destination) else None 

383 

384 def _set_state(future, other): 

385 if isfuture(future): 

386 _copy_future_state(other, future) 

387 else: 

388 _set_concurrent_future_state(future, other) 

389 

390 def _call_check_cancel(destination): 

391 if destination.cancelled(): 

392 if source_loop is None or source_loop is dest_loop: 

393 source.cancel() 

394 else: 

395 source_loop.call_soon_threadsafe(source.cancel) 

396 

397 def _call_set_state(source): 

398 if (destination.cancelled() and 398 ↛ 400line 398 didn't jump to line 400 because the condition on line 398 was never true

399 dest_loop is not None and dest_loop.is_closed()): 

400 return 

401 if dest_loop is None or dest_loop is source_loop: 

402 _set_state(destination, source) 

403 else: 

404 if dest_loop.is_closed(): 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true

405 return 

406 dest_loop.call_soon_threadsafe(_set_state, destination, source) 

407 

408 destination.add_done_callback(_call_check_cancel) 

409 source.add_done_callback(_call_set_state) 

410 

411 

412def wrap_future(future, *, loop=None): 

413 """Wrap concurrent.futures.Future object.""" 

414 if isfuture(future): 

415 return future 

416 assert isinstance(future, concurrent.futures.Future), \ 

417 f'concurrent.futures.Future is expected, got {future!r}' 

418 if loop is None: 

419 loop = events.get_event_loop() 

420 new_future = loop.create_future() 

421 _chain_future(future, new_future) 

422 return new_future 

423 

424 

425def future_add_to_awaited_by(fut, waiter, /): 

426 """Record that `fut` is awaited on by `waiter`.""" 

427 # For the sake of keeping the implementation minimal and assuming 

428 # that most of asyncio users use the built-in Futures and Tasks 

429 # (or their subclasses), we only support native Future objects 

430 # and their subclasses. 

431 # 

432 # Longer version: tracking requires storing the caller-callee 

433 # dependency somewhere. One obvious choice is to store that 

434 # information right in the future itself in a dedicated attribute. 

435 # This means that we'd have to require all duck-type compatible 

436 # futures to implement a specific attribute used by asyncio for 

437 # the book keeping. Another solution would be to store that in 

438 # a global dictionary. The downside here is that that would create 

439 # strong references and any scenario where the "add" call isn't 

440 # followed by a "discard" call would lead to a memory leak. 

441 # Using WeakDict would resolve that issue, but would complicate 

442 # the C code (_asynciomodule.c). The bottom line here is that 

443 # it's not clear that all this work would be worth the effort. 

444 # 

445 # Note that there's an accelerated version of this function 

446 # shadowing this implementation later in this file. 

447 if isinstance(fut, _PyFuture) and isinstance(waiter, _PyFuture): 

448 if fut._Future__asyncio_awaited_by is None: 

449 fut._Future__asyncio_awaited_by = set() 

450 fut._Future__asyncio_awaited_by.add(waiter) 

451 

452 

453def future_discard_from_awaited_by(fut, waiter, /): 

454 """Record that `fut` is no longer awaited on by `waiter`.""" 

455 # See the comment in "future_add_to_awaited_by()" body for 

456 # details on implementation. 

457 # 

458 # Note that there's an accelerated version of this function 

459 # shadowing this implementation later in this file. 

460 if isinstance(fut, _PyFuture) and isinstance(waiter, _PyFuture): 

461 if fut._Future__asyncio_awaited_by is not None: 461 ↛ exitline 461 didn't return from function 'future_discard_from_awaited_by' because the condition on line 461 was always true

462 fut._Future__asyncio_awaited_by.discard(waiter) 

463 

464 

465_py_future_add_to_awaited_by = future_add_to_awaited_by 

466_py_future_discard_from_awaited_by = future_discard_from_awaited_by 

467 

468try: 

469 import _asyncio 

470except ImportError: 

471 pass 

472else: 

473 # _CFuture is needed for tests. 

474 Future = _CFuture = _asyncio.Future 

475 future_add_to_awaited_by = _asyncio.future_add_to_awaited_by 

476 future_discard_from_awaited_by = _asyncio.future_discard_from_awaited_by 

477 _c_future_add_to_awaited_by = future_add_to_awaited_by 

478 _c_future_discard_from_awaited_by = future_discard_from_awaited_by