Coverage for Lib/asyncio/base_futures.py: 94%
38 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
1__all__ = ()
3import reprlib
5from . import format_helpers
7# States for Future.
8_PENDING = 'PENDING'
9_CANCELLED = 'CANCELLED'
10_FINISHED = 'FINISHED'
13def isfuture(obj):
14 """Check for a Future.
16 This returns True when obj is a Future instance or is advertising
17 itself as duck-type compatible by setting _asyncio_future_blocking.
18 See comment in Future for more details.
19 """
20 return (hasattr(obj.__class__, '_asyncio_future_blocking') and
21 obj._asyncio_future_blocking is not None)
24def _format_callbacks(cb):
25 """helper function for Future.__repr__"""
26 size = len(cb)
27 if not size: 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true
28 cb = ''
30 def format_cb(callback):
31 return format_helpers._format_callback_source(callback, ())
33 if size == 1:
34 cb = format_cb(cb[0][0])
35 elif size == 2:
36 cb = '{}, {}'.format(format_cb(cb[0][0]), format_cb(cb[1][0]))
37 elif size > 2: 37 ↛ 41line 37 didn't jump to line 41 because the condition on line 37 was always true
38 cb = '{}, <{} more>, {}'.format(format_cb(cb[0][0]),
39 size - 2,
40 format_cb(cb[-1][0]))
41 return f'cb=[{cb}]'
44def _future_repr_info(future):
45 # (Future) -> str
46 """helper function for Future.__repr__"""
47 info = [future._state.lower()]
48 if future._state == _FINISHED:
49 if future._exception is not None:
50 info.append(f'exception={future._exception!r}')
51 else:
52 # use reprlib to limit the length of the output, especially
53 # for very long strings
54 result = reprlib.repr(future._result)
55 info.append(f'result={result}')
56 if future._callbacks:
57 info.append(_format_callbacks(future._callbacks))
58 if future._source_traceback:
59 frame = future._source_traceback[-1]
60 info.append(f'created at {frame[0]}:{frame[1]}')
61 return info
64@reprlib.recursive_repr()
65def _future_repr(future):
66 info = ' '.join(_future_repr_info(future))
67 return f'<{future.__class__.__name__} {info}>'