Coverage for Lib / asyncio / futures.py: 96%

245 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-18 01:49 +0000

1"""A Future class similar to the one in PEP 3148.""" 

2 

3__all__ = ( 

4 'Future', 'wrap_future', 'isfuture', 

5 'future_add_to_awaited_by', 'future_discard_from_awaited_by', 

6) 

7 

8import concurrent.futures 

9import contextvars 

10import logging 

11import sys 

12from types import GenericAlias 

13 

14from . import base_futures 

15from . import events 

16from . import exceptions 

17from . import format_helpers 

18 

19 

20isfuture = base_futures.isfuture 

21 

22 

23_PENDING = base_futures._PENDING 

24_CANCELLED = base_futures._CANCELLED 

25_FINISHED = base_futures._FINISHED 

26 

27 

28STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging 

29 

30 

31class Future: 

32 """This class is *almost* compatible with concurrent.futures.Future. 

33 

34 Differences: 

35 

36 - This class is not thread-safe. 

37 

38 - result() and exception() do not take a timeout argument and 

39 raise an exception when the future isn't done yet. 

40 

41 - Callbacks registered with add_done_callback() are always called 

42 via the event loop's call_soon(). 

43 

44 - This class is not compatible with the wait() and as_completed() 

45 methods in the concurrent.futures package. 

46 

47 """ 

48 

49 # Class variables serving as defaults for instance variables. 

50 _state = _PENDING 

51 _result = None 

52 _exception = None 

53 _loop = None 

54 _source_traceback = None 

55 _cancel_message = None 

56 # A saved CancelledError for later chaining as an exception context. 

57 _cancelled_exc = None 

58 

59 # This field is used for a dual purpose: 

60 # - Its presence is a marker to declare that a class implements 

61 # the Future protocol (i.e. is intended to be duck-type compatible). 

62 # The value must also be not-None, to enable a subclass to declare 

63 # that it is not compatible by setting this to None. 

64 # - It is set by __iter__() below so that Task.__step() can tell 

65 # the difference between 

66 # `await Future()` or `yield from Future()` (correct) vs. 

67 # `yield Future()` (incorrect). 

68 _asyncio_future_blocking = False 

69 

70 # Used by the capture_call_stack() API. 

71 __asyncio_awaited_by = None 

72 

73 __log_traceback = False 

74 

75 def __init__(self, *, loop=None): 

76 """Initialize the future. 

77 

78 The optional event_loop argument allows explicitly setting the event 

79 loop object used by the future. If it's not provided, the future uses 

80 the default event loop. 

81 """ 

82 if self._loop is not None: 

83 raise RuntimeError(f"{self.__class__.__name__} object is already " 

84 "initialized") 

85 

86 if loop is None: 

87 self._loop = events.get_event_loop() 

88 else: 

89 self._loop = loop 

90 self._callbacks = [] 

91 if self._loop.get_debug(): 

92 self._source_traceback = format_helpers.extract_stack( 

93 sys._getframe(1)) 

94 

95 def __repr__(self): 

96 return base_futures._future_repr(self) 

97 

98 def __del__(self): 

99 if not self.__log_traceback: 

100 # set_exception() was not called, or result() or exception() 

101 # has consumed the exception 

102 return 

103 exc = self._exception 

104 context = { 

105 'message': 

106 f'{self.__class__.__name__} exception was never retrieved', 

107 'exception': exc, 

108 'future': self, 

109 } 

110 if self._source_traceback: 

111 context['source_traceback'] = self._source_traceback 

112 self._loop.call_exception_handler(context) 

113 

114 __class_getitem__ = classmethod(GenericAlias) 

115 

116 @property 

117 def _log_traceback(self): 

118 return self.__log_traceback 

119 

120 @_log_traceback.setter 

121 def _log_traceback(self, val): 

122 if val: 

123 raise ValueError('_log_traceback can only be set to False') 

124 self.__log_traceback = False 

125 

126 @property 

127 def _asyncio_awaited_by(self): 

128 if self.__asyncio_awaited_by is None: 

129 return None 

130 return frozenset(self.__asyncio_awaited_by) 

131 

132 def get_loop(self): 

133 """Return the event loop the Future is bound to.""" 

134 loop = self._loop 

135 if loop is None: 

136 raise RuntimeError("Future object is not initialized.") 

137 return loop 

138 

139 def _make_cancelled_error(self): 

140 """Create the CancelledError to raise if the Future is cancelled. 

141 

142 This should only be called once when handling a cancellation since 

143 it erases the saved context exception value. 

144 """ 

145 if self._cancelled_exc is not None: 

146 exc = self._cancelled_exc 

147 self._cancelled_exc = None 

148 return exc 

149 

150 if self._cancel_message is None: 

151 exc = exceptions.CancelledError() 

152 else: 

153 exc = exceptions.CancelledError(self._cancel_message) 

154 return exc 

155 

156 def cancel(self, msg=None): 

157 """Cancel the future and schedule callbacks. 

158 

159 If the future is already done or cancelled, return False. Otherwise, 

160 change the future's state to cancelled, schedule the callbacks and 

161 return True. 

162 """ 

163 self.__log_traceback = False 

164 if self._state != _PENDING: 

165 return False 

166 self._state = _CANCELLED 

167 self._cancel_message = msg 

168 self.__schedule_callbacks() 

169 return True 

170 

171 def __schedule_callbacks(self): 

172 """Internal: Ask the event loop to call all callbacks. 

173 

174 The callbacks are scheduled to be called as soon as possible. Also 

175 clears the callback list. 

176 """ 

177 callbacks = self._callbacks[:] 

178 if not callbacks: 

179 return 

180 

181 self._callbacks[:] = [] 

182 for callback, ctx in callbacks: 

183 self._loop.call_soon(callback, self, context=ctx) 

184 

185 def cancelled(self): 

186 """Return True if the future was cancelled.""" 

187 return self._state == _CANCELLED 

188 

189 # Don't implement running(); see http://bugs.python.org/issue18699 

190 

191 def done(self): 

192 """Return True if the future is done. 

193 

194 Done means either that a result / exception are available, or that the 

195 future was cancelled. 

196 """ 

197 return self._state != _PENDING 

198 

199 def result(self): 

200 """Return the result this future represents. 

201 

202 If the future has been cancelled, raises CancelledError. If the 

203 future's result isn't yet available, raises InvalidStateError. If 

204 the future is done and has an exception set, this exception is raised. 

205 """ 

206 if self._state == _CANCELLED: 

207 raise self._make_cancelled_error() 

208 if self._state != _FINISHED: 

209 raise exceptions.InvalidStateError('Result is not ready.') 

210 self.__log_traceback = False 

211 if self._exception is not None: 

212 raise self._exception.with_traceback(self._exception_tb) 

213 return self._result 

214 

215 def exception(self): 

216 """Return the exception that was set on this future. 

217 

218 The exception (or None if no exception was set) is returned only if 

219 the future is done. If the future has been cancelled, raises 

220 CancelledError. If the future isn't done yet, raises 

221 InvalidStateError. 

222 """ 

223 if self._state == _CANCELLED: 

224 raise self._make_cancelled_error() 

225 if self._state != _FINISHED: 

226 raise exceptions.InvalidStateError('Exception is not set.') 

227 self.__log_traceback = False 

228 return self._exception 

229 

230 def add_done_callback(self, fn, *, context=None): 

231 """Add a callback to be run when the future becomes done. 

232 

233 The callback is called with a single argument - the future object. If 

234 the future is already done when this is called, the callback is 

235 scheduled with call_soon. 

236 """ 

237 if self._state != _PENDING: 

238 self._loop.call_soon(fn, self, context=context) 

239 else: 

240 if context is None: 

241 context = contextvars.copy_context() 

242 self._callbacks.append((fn, context)) 

243 

244 # New method not in PEP 3148. 

245 

246 def remove_done_callback(self, fn): 

247 """Remove all instances of a callback from the "call when done" list. 

248 

249 Returns the number of callbacks removed. 

250 """ 

251 filtered_callbacks = [(f, ctx) 

252 for (f, ctx) in self._callbacks 

253 if f != fn] 

254 removed_count = len(self._callbacks) - len(filtered_callbacks) 

255 if removed_count: 

256 self._callbacks[:] = filtered_callbacks 

257 return removed_count 

258 

259 # So-called internal methods (note: no set_running_or_notify_cancel()). 

260 

261 def set_result(self, result): 

262 """Mark the future done and set its result. 

263 

264 If the future is already done when this method is called, raises 

265 InvalidStateError. 

266 """ 

267 if self._state != _PENDING: 

268 raise exceptions.InvalidStateError(f'{self._state}: {self!r}') 

269 self._result = result 

270 self._state = _FINISHED 

271 self.__schedule_callbacks() 

272 

273 def set_exception(self, exception): 

274 """Mark the future done and set an exception. 

275 

276 If the future is already done when this method is called, raises 

277 InvalidStateError. 

278 """ 

279 if self._state != _PENDING: 

280 raise exceptions.InvalidStateError(f'{self._state}: {self!r}') 

281 if isinstance(exception, type): 

282 exception = exception() 

283 if isinstance(exception, StopIteration): 

284 new_exc = RuntimeError("StopIteration interacts badly with " 

285 "generators and cannot be raised into a " 

286 "Future") 

287 new_exc.__cause__ = exception 

288 new_exc.__context__ = exception 

289 exception = new_exc 

290 self._exception = exception 

291 self._exception_tb = exception.__traceback__ 

292 self._state = _FINISHED 

293 self.__schedule_callbacks() 

294 self.__log_traceback = True 

295 

296 def __await__(self): 

297 if not self.done(): 

298 self._asyncio_future_blocking = True 

299 yield self # This tells Task to wait for completion. 

300 if not self.done(): 

301 raise RuntimeError("await wasn't used with future") 

302 return self.result() # May raise too. 

303 

304 __iter__ = __await__ # make compatible with 'yield from'. 

305 

306 

307# Needed for testing purposes. 

308_PyFuture = Future 

309 

310 

311def _get_loop(fut): 

312 # Tries to call Future.get_loop() if it's available. 

313 # Otherwise fallbacks to using the old '_loop' property. 

314 try: 

315 get_loop = fut.get_loop 

316 except AttributeError: 

317 pass 

318 else: 

319 return get_loop() 

320 return fut._loop 

321 

322 

323def _set_result_unless_cancelled(fut, result): 

324 """Helper setting the result only if the future was not cancelled.""" 

325 if fut.cancelled(): 

326 return 

327 fut.set_result(result) 

328 

329 

330def _convert_future_exc(exc): 

331 exc_class = type(exc) 

332 if exc_class is concurrent.futures.CancelledError: 

333 return exceptions.CancelledError(*exc.args).with_traceback(exc.__traceback__) 

334 elif exc_class is concurrent.futures.InvalidStateError: 

335 return exceptions.InvalidStateError(*exc.args).with_traceback(exc.__traceback__) 

336 else: 

337 return exc 

338 

339 

340def _set_concurrent_future_state(concurrent, source): 

341 """Copy state from a future to a concurrent.futures.Future.""" 

342 assert source.done() 

343 if source.cancelled(): 

344 concurrent.cancel() 

345 if not concurrent.set_running_or_notify_cancel(): 

346 return 

347 exception = source.exception() 

348 if exception is not None: 

349 concurrent.set_exception(_convert_future_exc(exception)) 

350 else: 

351 result = source.result() 

352 concurrent.set_result(result) 

353 

354 

355def _copy_future_state(source, dest): 

356 """Internal helper to copy state from another Future. 

357 

358 The other Future must be a concurrent.futures.Future. 

359 """ 

360 if dest.cancelled(): 

361 return 

362 assert not dest.done() 

363 done, cancelled, result, exception = source._get_snapshot() 

364 assert done 

365 if cancelled: 

366 dest.cancel() 

367 elif exception is not None: 

368 dest.set_exception(_convert_future_exc(exception)) 

369 else: 

370 dest.set_result(result) 

371 

372def _chain_future(source, destination): 

373 """Chain two futures so that when one completes, so does the other. 

374 

375 The result (or exception) of source will be copied to destination. 

376 If destination is cancelled, source gets cancelled too. 

377 Compatible with both asyncio.Future and concurrent.futures.Future. 

378 """ 

379 if not isfuture(source) and not isinstance(source, 379 ↛ 381line 379 didn't jump to line 381 because the condition on line 379 was never true

380 concurrent.futures.Future): 

381 raise TypeError('A future is required for source argument') 

382 if not isfuture(destination) and not isinstance(destination, 382 ↛ 384line 382 didn't jump to line 384 because the condition on line 382 was never true

383 concurrent.futures.Future): 

384 raise TypeError('A future is required for destination argument') 

385 source_loop = _get_loop(source) if isfuture(source) else None 

386 dest_loop = _get_loop(destination) if isfuture(destination) else None 

387 

388 def _set_state(future, other): 

389 if isfuture(future): 

390 _copy_future_state(other, future) 

391 else: 

392 _set_concurrent_future_state(future, other) 

393 

394 def _call_check_cancel(destination): 

395 if destination.cancelled(): 

396 if source_loop is None or source_loop is events._get_running_loop(): 396 ↛ 399line 396 didn't jump to line 399 because the condition on line 396 was always true

397 source.cancel() 

398 else: 

399 source_loop.call_soon_threadsafe(source.cancel) 

400 

401 def _call_set_state(source): 

402 if (destination.cancelled() and 402 ↛ 404line 402 didn't jump to line 404 because the condition on line 402 was never true

403 dest_loop is not None and dest_loop.is_closed()): 

404 return 

405 if dest_loop is None or dest_loop is events._get_running_loop(): 

406 _set_state(destination, source) 

407 else: 

408 if dest_loop.is_closed(): 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true

409 return 

410 dest_loop.call_soon_threadsafe(_set_state, destination, source) 

411 

412 destination.add_done_callback(_call_check_cancel) 

413 source.add_done_callback(_call_set_state) 

414 

415 

416def wrap_future(future, *, loop=None): 

417 """Wrap concurrent.futures.Future object.""" 

418 if isfuture(future): 

419 return future 

420 assert isinstance(future, concurrent.futures.Future), \ 

421 f'concurrent.futures.Future is expected, got {future!r}' 

422 if loop is None: 

423 loop = events.get_event_loop() 

424 new_future = loop.create_future() 

425 _chain_future(future, new_future) 

426 return new_future 

427 

428 

429def future_add_to_awaited_by(fut, waiter, /): 

430 """Record that `fut` is awaited on by `waiter`.""" 

431 # For the sake of keeping the implementation minimal and assuming 

432 # that most of asyncio users use the built-in Futures and Tasks 

433 # (or their subclasses), we only support native Future objects 

434 # and their subclasses. 

435 # 

436 # Longer version: tracking requires storing the caller-callee 

437 # dependency somewhere. One obvious choice is to store that 

438 # information right in the future itself in a dedicated attribute. 

439 # This means that we'd have to require all duck-type compatible 

440 # futures to implement a specific attribute used by asyncio for 

441 # the book keeping. Another solution would be to store that in 

442 # a global dictionary. The downside here is that that would create 

443 # strong references and any scenario where the "add" call isn't 

444 # followed by a "discard" call would lead to a memory leak. 

445 # Using WeakDict would resolve that issue, but would complicate 

446 # the C code (_asynciomodule.c). The bottom line here is that 

447 # it's not clear that all this work would be worth the effort. 

448 # 

449 # Note that there's an accelerated version of this function 

450 # shadowing this implementation later in this file. 

451 if isinstance(fut, _PyFuture) and isinstance(waiter, _PyFuture): 

452 if fut._Future__asyncio_awaited_by is None: 

453 fut._Future__asyncio_awaited_by = set() 

454 fut._Future__asyncio_awaited_by.add(waiter) 

455 

456 

457def future_discard_from_awaited_by(fut, waiter, /): 

458 """Record that `fut` is no longer awaited on by `waiter`.""" 

459 # See the comment in "future_add_to_awaited_by()" body for 

460 # details on implementation. 

461 # 

462 # Note that there's an accelerated version of this function 

463 # shadowing this implementation later in this file. 

464 if isinstance(fut, _PyFuture) and isinstance(waiter, _PyFuture): 

465 if fut._Future__asyncio_awaited_by is not None: 465 ↛ exitline 465 didn't return from function 'future_discard_from_awaited_by' because the condition on line 465 was always true

466 fut._Future__asyncio_awaited_by.discard(waiter) 

467 

468 

469_py_future_add_to_awaited_by = future_add_to_awaited_by 

470_py_future_discard_from_awaited_by = future_discard_from_awaited_by 

471 

472try: 

473 import _asyncio 

474except ImportError: 

475 pass 

476else: 

477 # _CFuture is needed for tests. 

478 Future = _CFuture = _asyncio.Future 

479 future_add_to_awaited_by = _asyncio.future_add_to_awaited_by 

480 future_discard_from_awaited_by = _asyncio.future_discard_from_awaited_by 

481 _c_future_add_to_awaited_by = future_add_to_awaited_by 

482 _c_future_discard_from_awaited_by = future_discard_from_awaited_by