Coverage for Lib/asyncio/futures.py: 96%
244 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
1"""A Future class similar to the one in PEP 3148."""
3__all__ = (
4 'Future', 'wrap_future', 'isfuture',
5 'future_add_to_awaited_by', 'future_discard_from_awaited_by',
6)
8import concurrent.futures
9import contextvars
10import logging
11import sys
12from types import GenericAlias
14from . import base_futures
15from . import events
16from . import exceptions
17from . import format_helpers
20isfuture = base_futures.isfuture
23_PENDING = base_futures._PENDING
24_CANCELLED = base_futures._CANCELLED
25_FINISHED = base_futures._FINISHED
28STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
31class Future:
32 """This class is *almost* compatible with concurrent.futures.Future.
34 Differences:
36 - This class is not thread-safe.
38 - result() and exception() do not take a timeout argument and
39 raise an exception when the future isn't done yet.
41 - Callbacks registered with add_done_callback() are always called
42 via the event loop's call_soon().
44 - This class is not compatible with the wait() and as_completed()
45 methods in the concurrent.futures package.
47 """
49 # Class variables serving as defaults for instance variables.
50 _state = _PENDING
51 _result = None
52 _exception = None
53 _loop = None
54 _source_traceback = None
55 _cancel_message = None
56 # A saved CancelledError for later chaining as an exception context.
57 _cancelled_exc = None
59 # This field is used for a dual purpose:
60 # - Its presence is a marker to declare that a class implements
61 # the Future protocol (i.e. is intended to be duck-type compatible).
62 # The value must also be not-None, to enable a subclass to declare
63 # that it is not compatible by setting this to None.
64 # - It is set by __iter__() below so that Task.__step() can tell
65 # the difference between
66 # `await Future()` or `yield from Future()` (correct) vs.
67 # `yield Future()` (incorrect).
68 _asyncio_future_blocking = False
70 # Used by the capture_call_stack() API.
71 __asyncio_awaited_by = None
73 __log_traceback = False
75 def __init__(self, *, loop=None):
76 """Initialize the future.
78 The optional event_loop argument allows explicitly setting the event
79 loop object used by the future. If it's not provided, the future uses
80 the default event loop.
81 """
82 if loop is None:
83 self._loop = events.get_event_loop()
84 else:
85 self._loop = loop
86 self._callbacks = []
87 if self._loop.get_debug():
88 self._source_traceback = format_helpers.extract_stack(
89 sys._getframe(1))
91 def __repr__(self):
92 return base_futures._future_repr(self)
94 def __del__(self):
95 if not self.__log_traceback:
96 # set_exception() was not called, or result() or exception()
97 # has consumed the exception
98 return
99 exc = self._exception
100 context = {
101 'message':
102 f'{self.__class__.__name__} exception was never retrieved',
103 'exception': exc,
104 'future': self,
105 }
106 if self._source_traceback:
107 context['source_traceback'] = self._source_traceback
108 self._loop.call_exception_handler(context)
110 __class_getitem__ = classmethod(GenericAlias)
112 @property
113 def _log_traceback(self):
114 return self.__log_traceback
116 @_log_traceback.setter
117 def _log_traceback(self, val):
118 if val:
119 raise ValueError('_log_traceback can only be set to False')
120 self.__log_traceback = False
122 @property
123 def _asyncio_awaited_by(self):
124 if self.__asyncio_awaited_by is None:
125 return None
126 return frozenset(self.__asyncio_awaited_by)
128 def get_loop(self):
129 """Return the event loop the Future is bound to."""
130 loop = self._loop
131 if loop is None:
132 raise RuntimeError("Future object is not initialized.")
133 return loop
135 def _make_cancelled_error(self):
136 """Create the CancelledError to raise if the Future is cancelled.
138 This should only be called once when handling a cancellation since
139 it erases the saved context exception value.
140 """
141 if self._cancelled_exc is not None:
142 exc = self._cancelled_exc
143 self._cancelled_exc = None
144 return exc
146 if self._cancel_message is None:
147 exc = exceptions.CancelledError()
148 else:
149 exc = exceptions.CancelledError(self._cancel_message)
150 return exc
152 def cancel(self, msg=None):
153 """Cancel the future and schedule callbacks.
155 If the future is already done or cancelled, return False. Otherwise,
156 change the future's state to cancelled, schedule the callbacks and
157 return True.
158 """
159 self.__log_traceback = False
160 if self._state != _PENDING:
161 return False
162 self._state = _CANCELLED
163 self._cancel_message = msg
164 self.__schedule_callbacks()
165 return True
167 def __schedule_callbacks(self):
168 """Internal: Ask the event loop to call all callbacks.
170 The callbacks are scheduled to be called as soon as possible. Also
171 clears the callback list.
172 """
173 callbacks = self._callbacks[:]
174 if not callbacks:
175 return
177 self._callbacks[:] = []
178 for callback, ctx in callbacks:
179 self._loop.call_soon(callback, self, context=ctx)
181 def cancelled(self):
182 """Return True if the future was cancelled."""
183 return self._state == _CANCELLED
185 # Don't implement running(); see http://bugs.python.org/issue18699
187 def done(self):
188 """Return True if the future is done.
190 Done means either that a result / exception are available, or that the
191 future was cancelled.
192 """
193 return self._state != _PENDING
195 def result(self):
196 """Return the result this future represents.
198 If the future has been cancelled, raises CancelledError. If the
199 future's result isn't yet available, raises InvalidStateError. If
200 the future is done and has an exception set, this exception is raised.
201 """
202 if self._state == _CANCELLED:
203 raise self._make_cancelled_error()
204 if self._state != _FINISHED:
205 raise exceptions.InvalidStateError('Result is not ready.')
206 self.__log_traceback = False
207 if self._exception is not None:
208 raise self._exception.with_traceback(self._exception_tb)
209 return self._result
211 def exception(self):
212 """Return the exception that was set on this future.
214 The exception (or None if no exception was set) is returned only if
215 the future is done. If the future has been cancelled, raises
216 CancelledError. If the future isn't done yet, raises
217 InvalidStateError.
218 """
219 if self._state == _CANCELLED:
220 raise self._make_cancelled_error()
221 if self._state != _FINISHED:
222 raise exceptions.InvalidStateError('Exception is not set.')
223 self.__log_traceback = False
224 return self._exception
226 def add_done_callback(self, fn, *, context=None):
227 """Add a callback to be run when the future becomes done.
229 The callback is called with a single argument - the future object. If
230 the future is already done when this is called, the callback is
231 scheduled with call_soon.
232 """
233 if self._state != _PENDING:
234 self._loop.call_soon(fn, self, context=context)
235 else:
236 if context is None:
237 context = contextvars.copy_context()
238 self._callbacks.append((fn, context))
240 # New method not in PEP 3148.
242 def remove_done_callback(self, fn):
243 """Remove all instances of a callback from the "call when done" list.
245 Returns the number of callbacks removed.
246 """
247 filtered_callbacks = [(f, ctx)
248 for (f, ctx) in self._callbacks
249 if f != fn]
250 removed_count = len(self._callbacks) - len(filtered_callbacks)
251 if removed_count:
252 self._callbacks[:] = filtered_callbacks
253 return removed_count
255 # So-called internal methods (note: no set_running_or_notify_cancel()).
257 def set_result(self, result):
258 """Mark the future done and set its result.
260 If the future is already done when this method is called, raises
261 InvalidStateError.
262 """
263 if self._state != _PENDING:
264 raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
265 self._result = result
266 self._state = _FINISHED
267 self.__schedule_callbacks()
269 def set_exception(self, exception):
270 """Mark the future done and set an exception.
272 If the future is already done when this method is called, raises
273 InvalidStateError.
274 """
275 if self._state != _PENDING:
276 raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
277 if isinstance(exception, type):
278 exception = exception()
279 if isinstance(exception, StopIteration):
280 new_exc = RuntimeError("StopIteration interacts badly with "
281 "generators and cannot be raised into a "
282 "Future")
283 new_exc.__cause__ = exception
284 new_exc.__context__ = exception
285 exception = new_exc
286 self._exception = exception
287 self._exception_tb = exception.__traceback__
288 self._state = _FINISHED
289 self.__schedule_callbacks()
290 self.__log_traceback = True
292 def __await__(self):
293 if not self.done():
294 self._asyncio_future_blocking = True
295 yield self # This tells Task to wait for completion.
296 if not self.done():
297 raise RuntimeError("await wasn't used with future")
298 return self.result() # May raise too.
300 __iter__ = __await__ # make compatible with 'yield from'.
303# Needed for testing purposes.
304_PyFuture = Future
307def _get_loop(fut):
308 # Tries to call Future.get_loop() if it's available.
309 # Otherwise fallbacks to using the old '_loop' property.
310 try:
311 get_loop = fut.get_loop
312 except AttributeError:
313 pass
314 else:
315 return get_loop()
316 return fut._loop
319def _set_result_unless_cancelled(fut, result):
320 """Helper setting the result only if the future was not cancelled."""
321 if fut.cancelled():
322 return
323 fut.set_result(result)
326def _convert_future_exc(exc):
327 exc_class = type(exc)
328 if exc_class is concurrent.futures.CancelledError:
329 return exceptions.CancelledError(*exc.args).with_traceback(exc.__traceback__)
330 elif exc_class is concurrent.futures.InvalidStateError:
331 return exceptions.InvalidStateError(*exc.args).with_traceback(exc.__traceback__)
332 else:
333 return exc
336def _set_concurrent_future_state(concurrent, source):
337 """Copy state from a future to a concurrent.futures.Future."""
338 assert source.done()
339 if source.cancelled():
340 concurrent.cancel()
341 if not concurrent.set_running_or_notify_cancel():
342 return
343 exception = source.exception()
344 if exception is not None:
345 concurrent.set_exception(_convert_future_exc(exception))
346 else:
347 result = source.result()
348 concurrent.set_result(result)
351def _copy_future_state(source, dest):
352 """Internal helper to copy state from another Future.
354 The other Future may be a concurrent.futures.Future.
355 """
356 assert source.done()
357 if dest.cancelled():
358 return
359 assert not dest.done()
360 if source.cancelled():
361 dest.cancel()
362 else:
363 exception = source.exception()
364 if exception is not None:
365 dest.set_exception(_convert_future_exc(exception))
366 else:
367 result = source.result()
368 dest.set_result(result)
371def _chain_future(source, destination):
372 """Chain two futures so that when one completes, so does the other.
374 The result (or exception) of source will be copied to destination.
375 If destination is cancelled, source gets cancelled too.
376 Compatible with both asyncio.Future and concurrent.futures.Future.
377 """
378 if not isfuture(source) and not isinstance(source, 378 ↛ 380line 378 didn't jump to line 380 because the condition on line 378 was never true
379 concurrent.futures.Future):
380 raise TypeError('A future is required for source argument')
381 if not isfuture(destination) and not isinstance(destination, 381 ↛ 383line 381 didn't jump to line 383 because the condition on line 381 was never true
382 concurrent.futures.Future):
383 raise TypeError('A future is required for destination argument')
384 source_loop = _get_loop(source) if isfuture(source) else None
385 dest_loop = _get_loop(destination) if isfuture(destination) else None
387 def _set_state(future, other):
388 if isfuture(future):
389 _copy_future_state(other, future)
390 else:
391 _set_concurrent_future_state(future, other)
393 def _call_check_cancel(destination):
394 if destination.cancelled():
395 if source_loop is None or source_loop is dest_loop:
396 source.cancel()
397 else:
398 source_loop.call_soon_threadsafe(source.cancel)
400 def _call_set_state(source):
401 if (destination.cancelled() and 401 ↛ 403line 401 didn't jump to line 403 because the condition on line 401 was never true
402 dest_loop is not None and dest_loop.is_closed()):
403 return
404 if dest_loop is None or dest_loop is source_loop:
405 _set_state(destination, source)
406 else:
407 if dest_loop.is_closed(): 407 ↛ 408line 407 didn't jump to line 408 because the condition on line 407 was never true
408 return
409 dest_loop.call_soon_threadsafe(_set_state, destination, source)
411 destination.add_done_callback(_call_check_cancel)
412 source.add_done_callback(_call_set_state)
415def wrap_future(future, *, loop=None):
416 """Wrap concurrent.futures.Future object."""
417 if isfuture(future):
418 return future
419 assert isinstance(future, concurrent.futures.Future), \
420 f'concurrent.futures.Future is expected, got {future!r}'
421 if loop is None:
422 loop = events.get_event_loop()
423 new_future = loop.create_future()
424 _chain_future(future, new_future)
425 return new_future
428def future_add_to_awaited_by(fut, waiter, /):
429 """Record that `fut` is awaited on by `waiter`."""
430 # For the sake of keeping the implementation minimal and assuming
431 # that most of asyncio users use the built-in Futures and Tasks
432 # (or their subclasses), we only support native Future objects
433 # and their subclasses.
434 #
435 # Longer version: tracking requires storing the caller-callee
436 # dependency somewhere. One obvious choice is to store that
437 # information right in the future itself in a dedicated attribute.
438 # This means that we'd have to require all duck-type compatible
439 # futures to implement a specific attribute used by asyncio for
440 # the book keeping. Another solution would be to store that in
441 # a global dictionary. The downside here is that that would create
442 # strong references and any scenario where the "add" call isn't
443 # followed by a "discard" call would lead to a memory leak.
444 # Using WeakDict would resolve that issue, but would complicate
445 # the C code (_asynciomodule.c). The bottom line here is that
446 # it's not clear that all this work would be worth the effort.
447 #
448 # Note that there's an accelerated version of this function
449 # shadowing this implementation later in this file.
450 if isinstance(fut, _PyFuture) and isinstance(waiter, _PyFuture):
451 if fut._Future__asyncio_awaited_by is None:
452 fut._Future__asyncio_awaited_by = set()
453 fut._Future__asyncio_awaited_by.add(waiter)
456def future_discard_from_awaited_by(fut, waiter, /):
457 """Record that `fut` is no longer awaited on by `waiter`."""
458 # See the comment in "future_add_to_awaited_by()" body for
459 # details on implementation.
460 #
461 # Note that there's an accelerated version of this function
462 # shadowing this implementation later in this file.
463 if isinstance(fut, _PyFuture) and isinstance(waiter, _PyFuture):
464 if fut._Future__asyncio_awaited_by is not None: 464 ↛ exitline 464 didn't return from function 'future_discard_from_awaited_by' because the condition on line 464 was always true
465 fut._Future__asyncio_awaited_by.discard(waiter)
468_py_future_add_to_awaited_by = future_add_to_awaited_by
469_py_future_discard_from_awaited_by = future_discard_from_awaited_by
471try:
472 import _asyncio
473except ImportError:
474 pass
475else:
476 # _CFuture is needed for tests.
477 Future = _CFuture = _asyncio.Future
478 future_add_to_awaited_by = _asyncio.future_add_to_awaited_by
479 future_discard_from_awaited_by = _asyncio.future_discard_from_awaited_by
480 _c_future_add_to_awaited_by = future_add_to_awaited_by
481 _c_future_discard_from_awaited_by = future_discard_from_awaited_by