Coverage for Lib/asyncio/timeouts.py: 98%
91 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:28 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-24 03:28 +0000
1import enum
3from types import TracebackType
5from . import events
6from . import exceptions
7from . import tasks
10__all__ = (
11 "Timeout",
12 "timeout",
13 "timeout_at",
14)
17class _State(enum.Enum):
18 CREATED = "created"
19 ENTERED = "active"
20 EXPIRING = "expiring"
21 EXPIRED = "expired"
22 EXITED = "finished"
25class Timeout:
26 """Asynchronous context manager for cancelling overdue coroutines.
28 Use `timeout()` or `timeout_at()` rather than instantiating this class
29 directly.
30 """
32 def __init__(self, when: float | None) -> None:
33 """Schedule a timeout that will trigger at a given loop time.
35 - If `when` is `None`, the timeout will never trigger.
36 - If `when < loop.time()`, the timeout will trigger on the next
37 iteration of the event loop.
38 """
39 self._state = _State.CREATED
41 self._timeout_handler: events.TimerHandle | None = None
42 self._task: tasks.Task | None = None
43 self._when = when
45 def when(self) -> float | None:
46 """Return the current deadline."""
47 return self._when
49 def reschedule(self, when: float | None) -> None:
50 """Reschedule the timeout."""
51 if self._state is not _State.ENTERED:
52 if self._state is _State.CREATED:
53 raise RuntimeError("Timeout has not been entered")
54 raise RuntimeError(
55 f"Cannot change state of {self._state.value} Timeout",
56 )
58 self._when = when
60 if self._timeout_handler is not None:
61 self._timeout_handler.cancel()
63 if when is None:
64 self._timeout_handler = None
65 else:
66 loop = events.get_running_loop()
67 if when <= loop.time():
68 self._timeout_handler = loop.call_soon(self._on_timeout)
69 else:
70 self._timeout_handler = loop.call_at(when, self._on_timeout)
72 def expired(self) -> bool:
73 """Is timeout expired during execution?"""
74 return self._state in (_State.EXPIRING, _State.EXPIRED)
76 def __repr__(self) -> str:
77 info = ['']
78 if self._state is _State.ENTERED:
79 when = round(self._when, 3) if self._when is not None else None
80 info.append(f"when={when}")
81 info_str = ' '.join(info)
82 return f"<Timeout [{self._state.value}]{info_str}>"
84 async def __aenter__(self) -> "Timeout":
85 if self._state is not _State.CREATED:
86 raise RuntimeError("Timeout has already been entered")
87 task = tasks.current_task()
88 if task is None:
89 raise RuntimeError("Timeout should be used inside a task")
90 self._state = _State.ENTERED
91 self._task = task
92 self._cancelling = self._task.cancelling()
93 self.reschedule(self._when)
94 return self
96 async def __aexit__(
97 self,
98 exc_type: type[BaseException] | None,
99 exc_val: BaseException | None,
100 exc_tb: TracebackType | None,
101 ) -> bool | None:
102 assert self._state in (_State.ENTERED, _State.EXPIRING)
104 if self._timeout_handler is not None:
105 self._timeout_handler.cancel()
106 self._timeout_handler = None
108 if self._state is _State.EXPIRING:
109 self._state = _State.EXPIRED
111 if self._task.uncancel() <= self._cancelling and exc_type is not None:
112 # Since there are no new cancel requests, we're
113 # handling this.
114 if issubclass(exc_type, exceptions.CancelledError):
115 raise TimeoutError from exc_val
116 elif exc_val is not None: 116 ↛ 124line 116 didn't jump to line 124 because the condition on line 116 was always true
117 self._insert_timeout_error(exc_val)
118 if isinstance(exc_val, ExceptionGroup):
119 for exc in exc_val.exceptions:
120 self._insert_timeout_error(exc)
121 elif self._state is _State.ENTERED: 121 ↛ 124line 121 didn't jump to line 124 because the condition on line 121 was always true
122 self._state = _State.EXITED
124 return None
126 def _on_timeout(self) -> None:
127 assert self._state is _State.ENTERED
128 self._task.cancel()
129 self._state = _State.EXPIRING
130 # drop the reference early
131 self._timeout_handler = None
133 @staticmethod
134 def _insert_timeout_error(exc_val: BaseException) -> None:
135 while exc_val.__context__ is not None: 135 ↛ exitline 135 didn't return from function '_insert_timeout_error' because the condition on line 135 was always true
136 if isinstance(exc_val.__context__, exceptions.CancelledError):
137 te = TimeoutError()
138 te.__context__ = te.__cause__ = exc_val.__context__
139 exc_val.__context__ = te
140 break
141 exc_val = exc_val.__context__
144def timeout(delay: float | None) -> Timeout:
145 """Timeout async context manager.
147 Useful in cases when you want to apply timeout logic around block
148 of code or in cases when asyncio.wait_for is not suitable. For example:
150 >>> async with asyncio.timeout(10): # 10 seconds timeout
151 ... await long_running_task()
154 delay - value in seconds or None to disable timeout logic
156 long_running_task() is interrupted by raising asyncio.CancelledError,
157 the top-most affected timeout() context manager converts CancelledError
158 into TimeoutError.
159 """
160 loop = events.get_running_loop()
161 return Timeout(loop.time() + delay if delay is not None else None)
164def timeout_at(when: float | None) -> Timeout:
165 """Schedule the timeout at absolute time.
167 Like timeout() but argument gives absolute time in the same clock system
168 as loop.time().
170 Please note: it is not POSIX time but a time with
171 undefined starting base, e.g. the time of the system power on.
173 >>> async with asyncio.timeout_at(loop.time() + 10):
174 ... await long_running_task()
177 when - a deadline when timeout occurs or None to disable timeout logic
179 long_running_task() is interrupted by raising asyncio.CancelledError,
180 the top-most affected timeout() context manager converts CancelledError
181 into TimeoutError.
182 """
183 return Timeout(when)