Coverage for Lib / asyncio / futures.py: 96%
245 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-18 01:49 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-18 01:49 +0000
1"""A Future class similar to the one in PEP 3148."""
3__all__ = (
4 'Future', 'wrap_future', 'isfuture',
5 'future_add_to_awaited_by', 'future_discard_from_awaited_by',
6)
8import concurrent.futures
9import contextvars
10import logging
11import sys
12from types import GenericAlias
14from . import base_futures
15from . import events
16from . import exceptions
17from . import format_helpers
20isfuture = base_futures.isfuture
23_PENDING = base_futures._PENDING
24_CANCELLED = base_futures._CANCELLED
25_FINISHED = base_futures._FINISHED
28STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
31class Future:
32 """This class is *almost* compatible with concurrent.futures.Future.
34 Differences:
36 - This class is not thread-safe.
38 - result() and exception() do not take a timeout argument and
39 raise an exception when the future isn't done yet.
41 - Callbacks registered with add_done_callback() are always called
42 via the event loop's call_soon().
44 - This class is not compatible with the wait() and as_completed()
45 methods in the concurrent.futures package.
47 """
49 # Class variables serving as defaults for instance variables.
50 _state = _PENDING
51 _result = None
52 _exception = None
53 _loop = None
54 _source_traceback = None
55 _cancel_message = None
56 # A saved CancelledError for later chaining as an exception context.
57 _cancelled_exc = None
59 # This field is used for a dual purpose:
60 # - Its presence is a marker to declare that a class implements
61 # the Future protocol (i.e. is intended to be duck-type compatible).
62 # The value must also be not-None, to enable a subclass to declare
63 # that it is not compatible by setting this to None.
64 # - It is set by __iter__() below so that Task.__step() can tell
65 # the difference between
66 # `await Future()` or `yield from Future()` (correct) vs.
67 # `yield Future()` (incorrect).
68 _asyncio_future_blocking = False
70 # Used by the capture_call_stack() API.
71 __asyncio_awaited_by = None
73 __log_traceback = False
75 def __init__(self, *, loop=None):
76 """Initialize the future.
78 The optional event_loop argument allows explicitly setting the event
79 loop object used by the future. If it's not provided, the future uses
80 the default event loop.
81 """
82 if self._loop is not None:
83 raise RuntimeError(f"{self.__class__.__name__} object is already "
84 "initialized")
86 if loop is None:
87 self._loop = events.get_event_loop()
88 else:
89 self._loop = loop
90 self._callbacks = []
91 if self._loop.get_debug():
92 self._source_traceback = format_helpers.extract_stack(
93 sys._getframe(1))
95 def __repr__(self):
96 return base_futures._future_repr(self)
98 def __del__(self):
99 if not self.__log_traceback:
100 # set_exception() was not called, or result() or exception()
101 # has consumed the exception
102 return
103 exc = self._exception
104 context = {
105 'message':
106 f'{self.__class__.__name__} exception was never retrieved',
107 'exception': exc,
108 'future': self,
109 }
110 if self._source_traceback:
111 context['source_traceback'] = self._source_traceback
112 self._loop.call_exception_handler(context)
114 __class_getitem__ = classmethod(GenericAlias)
116 @property
117 def _log_traceback(self):
118 return self.__log_traceback
120 @_log_traceback.setter
121 def _log_traceback(self, val):
122 if val:
123 raise ValueError('_log_traceback can only be set to False')
124 self.__log_traceback = False
126 @property
127 def _asyncio_awaited_by(self):
128 if self.__asyncio_awaited_by is None:
129 return None
130 return frozenset(self.__asyncio_awaited_by)
132 def get_loop(self):
133 """Return the event loop the Future is bound to."""
134 loop = self._loop
135 if loop is None:
136 raise RuntimeError("Future object is not initialized.")
137 return loop
139 def _make_cancelled_error(self):
140 """Create the CancelledError to raise if the Future is cancelled.
142 This should only be called once when handling a cancellation since
143 it erases the saved context exception value.
144 """
145 if self._cancelled_exc is not None:
146 exc = self._cancelled_exc
147 self._cancelled_exc = None
148 return exc
150 if self._cancel_message is None:
151 exc = exceptions.CancelledError()
152 else:
153 exc = exceptions.CancelledError(self._cancel_message)
154 return exc
156 def cancel(self, msg=None):
157 """Cancel the future and schedule callbacks.
159 If the future is already done or cancelled, return False. Otherwise,
160 change the future's state to cancelled, schedule the callbacks and
161 return True.
162 """
163 self.__log_traceback = False
164 if self._state != _PENDING:
165 return False
166 self._state = _CANCELLED
167 self._cancel_message = msg
168 self.__schedule_callbacks()
169 return True
171 def __schedule_callbacks(self):
172 """Internal: Ask the event loop to call all callbacks.
174 The callbacks are scheduled to be called as soon as possible. Also
175 clears the callback list.
176 """
177 callbacks = self._callbacks[:]
178 if not callbacks:
179 return
181 self._callbacks[:] = []
182 for callback, ctx in callbacks:
183 self._loop.call_soon(callback, self, context=ctx)
185 def cancelled(self):
186 """Return True if the future was cancelled."""
187 return self._state == _CANCELLED
189 # Don't implement running(); see http://bugs.python.org/issue18699
191 def done(self):
192 """Return True if the future is done.
194 Done means either that a result / exception are available, or that the
195 future was cancelled.
196 """
197 return self._state != _PENDING
199 def result(self):
200 """Return the result this future represents.
202 If the future has been cancelled, raises CancelledError. If the
203 future's result isn't yet available, raises InvalidStateError. If
204 the future is done and has an exception set, this exception is raised.
205 """
206 if self._state == _CANCELLED:
207 raise self._make_cancelled_error()
208 if self._state != _FINISHED:
209 raise exceptions.InvalidStateError('Result is not ready.')
210 self.__log_traceback = False
211 if self._exception is not None:
212 raise self._exception.with_traceback(self._exception_tb)
213 return self._result
215 def exception(self):
216 """Return the exception that was set on this future.
218 The exception (or None if no exception was set) is returned only if
219 the future is done. If the future has been cancelled, raises
220 CancelledError. If the future isn't done yet, raises
221 InvalidStateError.
222 """
223 if self._state == _CANCELLED:
224 raise self._make_cancelled_error()
225 if self._state != _FINISHED:
226 raise exceptions.InvalidStateError('Exception is not set.')
227 self.__log_traceback = False
228 return self._exception
230 def add_done_callback(self, fn, *, context=None):
231 """Add a callback to be run when the future becomes done.
233 The callback is called with a single argument - the future object. If
234 the future is already done when this is called, the callback is
235 scheduled with call_soon.
236 """
237 if self._state != _PENDING:
238 self._loop.call_soon(fn, self, context=context)
239 else:
240 if context is None:
241 context = contextvars.copy_context()
242 self._callbacks.append((fn, context))
244 # New method not in PEP 3148.
246 def remove_done_callback(self, fn):
247 """Remove all instances of a callback from the "call when done" list.
249 Returns the number of callbacks removed.
250 """
251 filtered_callbacks = [(f, ctx)
252 for (f, ctx) in self._callbacks
253 if f != fn]
254 removed_count = len(self._callbacks) - len(filtered_callbacks)
255 if removed_count:
256 self._callbacks[:] = filtered_callbacks
257 return removed_count
259 # So-called internal methods (note: no set_running_or_notify_cancel()).
261 def set_result(self, result):
262 """Mark the future done and set its result.
264 If the future is already done when this method is called, raises
265 InvalidStateError.
266 """
267 if self._state != _PENDING:
268 raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
269 self._result = result
270 self._state = _FINISHED
271 self.__schedule_callbacks()
273 def set_exception(self, exception):
274 """Mark the future done and set an exception.
276 If the future is already done when this method is called, raises
277 InvalidStateError.
278 """
279 if self._state != _PENDING:
280 raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
281 if isinstance(exception, type):
282 exception = exception()
283 if isinstance(exception, StopIteration):
284 new_exc = RuntimeError("StopIteration interacts badly with "
285 "generators and cannot be raised into a "
286 "Future")
287 new_exc.__cause__ = exception
288 new_exc.__context__ = exception
289 exception = new_exc
290 self._exception = exception
291 self._exception_tb = exception.__traceback__
292 self._state = _FINISHED
293 self.__schedule_callbacks()
294 self.__log_traceback = True
296 def __await__(self):
297 if not self.done():
298 self._asyncio_future_blocking = True
299 yield self # This tells Task to wait for completion.
300 if not self.done():
301 raise RuntimeError("await wasn't used with future")
302 return self.result() # May raise too.
304 __iter__ = __await__ # make compatible with 'yield from'.
307# Needed for testing purposes.
308_PyFuture = Future
311def _get_loop(fut):
312 # Tries to call Future.get_loop() if it's available.
313 # Otherwise fallbacks to using the old '_loop' property.
314 try:
315 get_loop = fut.get_loop
316 except AttributeError:
317 pass
318 else:
319 return get_loop()
320 return fut._loop
323def _set_result_unless_cancelled(fut, result):
324 """Helper setting the result only if the future was not cancelled."""
325 if fut.cancelled():
326 return
327 fut.set_result(result)
330def _convert_future_exc(exc):
331 exc_class = type(exc)
332 if exc_class is concurrent.futures.CancelledError:
333 return exceptions.CancelledError(*exc.args).with_traceback(exc.__traceback__)
334 elif exc_class is concurrent.futures.InvalidStateError:
335 return exceptions.InvalidStateError(*exc.args).with_traceback(exc.__traceback__)
336 else:
337 return exc
340def _set_concurrent_future_state(concurrent, source):
341 """Copy state from a future to a concurrent.futures.Future."""
342 assert source.done()
343 if source.cancelled():
344 concurrent.cancel()
345 if not concurrent.set_running_or_notify_cancel():
346 return
347 exception = source.exception()
348 if exception is not None:
349 concurrent.set_exception(_convert_future_exc(exception))
350 else:
351 result = source.result()
352 concurrent.set_result(result)
355def _copy_future_state(source, dest):
356 """Internal helper to copy state from another Future.
358 The other Future must be a concurrent.futures.Future.
359 """
360 if dest.cancelled():
361 return
362 assert not dest.done()
363 done, cancelled, result, exception = source._get_snapshot()
364 assert done
365 if cancelled:
366 dest.cancel()
367 elif exception is not None:
368 dest.set_exception(_convert_future_exc(exception))
369 else:
370 dest.set_result(result)
372def _chain_future(source, destination):
373 """Chain two futures so that when one completes, so does the other.
375 The result (or exception) of source will be copied to destination.
376 If destination is cancelled, source gets cancelled too.
377 Compatible with both asyncio.Future and concurrent.futures.Future.
378 """
379 if not isfuture(source) and not isinstance(source, 379 ↛ 381line 379 didn't jump to line 381 because the condition on line 379 was never true
380 concurrent.futures.Future):
381 raise TypeError('A future is required for source argument')
382 if not isfuture(destination) and not isinstance(destination, 382 ↛ 384line 382 didn't jump to line 384 because the condition on line 382 was never true
383 concurrent.futures.Future):
384 raise TypeError('A future is required for destination argument')
385 source_loop = _get_loop(source) if isfuture(source) else None
386 dest_loop = _get_loop(destination) if isfuture(destination) else None
388 def _set_state(future, other):
389 if isfuture(future):
390 _copy_future_state(other, future)
391 else:
392 _set_concurrent_future_state(future, other)
394 def _call_check_cancel(destination):
395 if destination.cancelled():
396 if source_loop is None or source_loop is events._get_running_loop(): 396 ↛ 399line 396 didn't jump to line 399 because the condition on line 396 was always true
397 source.cancel()
398 else:
399 source_loop.call_soon_threadsafe(source.cancel)
401 def _call_set_state(source):
402 if (destination.cancelled() and 402 ↛ 404line 402 didn't jump to line 404 because the condition on line 402 was never true
403 dest_loop is not None and dest_loop.is_closed()):
404 return
405 if dest_loop is None or dest_loop is events._get_running_loop():
406 _set_state(destination, source)
407 else:
408 if dest_loop.is_closed(): 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true
409 return
410 dest_loop.call_soon_threadsafe(_set_state, destination, source)
412 destination.add_done_callback(_call_check_cancel)
413 source.add_done_callback(_call_set_state)
416def wrap_future(future, *, loop=None):
417 """Wrap concurrent.futures.Future object."""
418 if isfuture(future):
419 return future
420 assert isinstance(future, concurrent.futures.Future), \
421 f'concurrent.futures.Future is expected, got {future!r}'
422 if loop is None:
423 loop = events.get_event_loop()
424 new_future = loop.create_future()
425 _chain_future(future, new_future)
426 return new_future
429def future_add_to_awaited_by(fut, waiter, /):
430 """Record that `fut` is awaited on by `waiter`."""
431 # For the sake of keeping the implementation minimal and assuming
432 # that most of asyncio users use the built-in Futures and Tasks
433 # (or their subclasses), we only support native Future objects
434 # and their subclasses.
435 #
436 # Longer version: tracking requires storing the caller-callee
437 # dependency somewhere. One obvious choice is to store that
438 # information right in the future itself in a dedicated attribute.
439 # This means that we'd have to require all duck-type compatible
440 # futures to implement a specific attribute used by asyncio for
441 # the book keeping. Another solution would be to store that in
442 # a global dictionary. The downside here is that that would create
443 # strong references and any scenario where the "add" call isn't
444 # followed by a "discard" call would lead to a memory leak.
445 # Using WeakDict would resolve that issue, but would complicate
446 # the C code (_asynciomodule.c). The bottom line here is that
447 # it's not clear that all this work would be worth the effort.
448 #
449 # Note that there's an accelerated version of this function
450 # shadowing this implementation later in this file.
451 if isinstance(fut, _PyFuture) and isinstance(waiter, _PyFuture):
452 if fut._Future__asyncio_awaited_by is None:
453 fut._Future__asyncio_awaited_by = set()
454 fut._Future__asyncio_awaited_by.add(waiter)
457def future_discard_from_awaited_by(fut, waiter, /):
458 """Record that `fut` is no longer awaited on by `waiter`."""
459 # See the comment in "future_add_to_awaited_by()" body for
460 # details on implementation.
461 #
462 # Note that there's an accelerated version of this function
463 # shadowing this implementation later in this file.
464 if isinstance(fut, _PyFuture) and isinstance(waiter, _PyFuture):
465 if fut._Future__asyncio_awaited_by is not None: 465 ↛ exitline 465 didn't return from function 'future_discard_from_awaited_by' because the condition on line 465 was always true
466 fut._Future__asyncio_awaited_by.discard(waiter)
469_py_future_add_to_awaited_by = future_add_to_awaited_by
470_py_future_discard_from_awaited_by = future_discard_from_awaited_by
472try:
473 import _asyncio
474except ImportError:
475 pass
476else:
477 # _CFuture is needed for tests.
478 Future = _CFuture = _asyncio.Future
479 future_add_to_awaited_by = _asyncio.future_add_to_awaited_by
480 future_discard_from_awaited_by = _asyncio.future_discard_from_awaited_by
481 _c_future_add_to_awaited_by = future_add_to_awaited_by
482 _c_future_discard_from_awaited_by = future_discard_from_awaited_by