Coverage for Lib/asyncio/timeouts.py: 98%
91 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
1import enum
3from types import TracebackType
5from . import events
6from . import exceptions
7from . import tasks
10__all__ = (
11 "Timeout",
12 "timeout",
13 "timeout_at",
14)
17class _State(enum.Enum):
18 CREATED = "created"
19 ENTERED = "active"
20 EXPIRING = "expiring"
21 EXPIRED = "expired"
22 EXITED = "finished"
25class Timeout:
26 """Asynchronous context manager for cancelling overdue coroutines.
28 Use `timeout()` or `timeout_at()` rather than instantiating this class directly.
29 """
31 def __init__(self, when: float | None) -> None:
32 """Schedule a timeout that will trigger at a given loop time.
34 - If `when` is `None`, the timeout will never trigger.
35 - If `when < loop.time()`, the timeout will trigger on the next
36 iteration of the event loop.
37 """
38 self._state = _State.CREATED
40 self._timeout_handler: events.TimerHandle | None = None
41 self._task: tasks.Task | None = None
42 self._when = when
44 def when(self) -> float | None:
45 """Return the current deadline."""
46 return self._when
48 def reschedule(self, when: float | None) -> None:
49 """Reschedule the timeout."""
50 if self._state is not _State.ENTERED:
51 if self._state is _State.CREATED:
52 raise RuntimeError("Timeout has not been entered")
53 raise RuntimeError(
54 f"Cannot change state of {self._state.value} Timeout",
55 )
57 self._when = when
59 if self._timeout_handler is not None:
60 self._timeout_handler.cancel()
62 if when is None:
63 self._timeout_handler = None
64 else:
65 loop = events.get_running_loop()
66 if when <= loop.time():
67 self._timeout_handler = loop.call_soon(self._on_timeout)
68 else:
69 self._timeout_handler = loop.call_at(when, self._on_timeout)
71 def expired(self) -> bool:
72 """Is timeout expired during execution?"""
73 return self._state in (_State.EXPIRING, _State.EXPIRED)
75 def __repr__(self) -> str:
76 info = ['']
77 if self._state is _State.ENTERED:
78 when = round(self._when, 3) if self._when is not None else None
79 info.append(f"when={when}")
80 info_str = ' '.join(info)
81 return f"<Timeout [{self._state.value}]{info_str}>"
83 async def __aenter__(self) -> "Timeout":
84 if self._state is not _State.CREATED:
85 raise RuntimeError("Timeout has already been entered")
86 task = tasks.current_task()
87 if task is None:
88 raise RuntimeError("Timeout should be used inside a task")
89 self._state = _State.ENTERED
90 self._task = task
91 self._cancelling = self._task.cancelling()
92 self.reschedule(self._when)
93 return self
95 async def __aexit__(
96 self,
97 exc_type: type[BaseException] | None,
98 exc_val: BaseException | None,
99 exc_tb: TracebackType | None,
100 ) -> bool | None:
101 assert self._state in (_State.ENTERED, _State.EXPIRING)
103 if self._timeout_handler is not None:
104 self._timeout_handler.cancel()
105 self._timeout_handler = None
107 if self._state is _State.EXPIRING:
108 self._state = _State.EXPIRED
110 if self._task.uncancel() <= self._cancelling and exc_type is not None:
111 # Since there are no new cancel requests, we're
112 # handling this.
113 if issubclass(exc_type, exceptions.CancelledError):
114 raise TimeoutError from exc_val
115 elif exc_val is not None: 115 ↛ 123line 115 didn't jump to line 123 because the condition on line 115 was always true
116 self._insert_timeout_error(exc_val)
117 if isinstance(exc_val, ExceptionGroup):
118 for exc in exc_val.exceptions:
119 self._insert_timeout_error(exc)
120 elif self._state is _State.ENTERED: 120 ↛ 123line 120 didn't jump to line 123 because the condition on line 120 was always true
121 self._state = _State.EXITED
123 return None
125 def _on_timeout(self) -> None:
126 assert self._state is _State.ENTERED
127 self._task.cancel()
128 self._state = _State.EXPIRING
129 # drop the reference early
130 self._timeout_handler = None
132 @staticmethod
133 def _insert_timeout_error(exc_val: BaseException) -> None:
134 while exc_val.__context__ is not None: 134 ↛ exitline 134 didn't return from function '_insert_timeout_error' because the condition on line 134 was always true
135 if isinstance(exc_val.__context__, exceptions.CancelledError):
136 te = TimeoutError()
137 te.__context__ = te.__cause__ = exc_val.__context__
138 exc_val.__context__ = te
139 break
140 exc_val = exc_val.__context__
143def timeout(delay: float | None) -> Timeout:
144 """Timeout async context manager.
146 Useful in cases when you want to apply timeout logic around block
147 of code or in cases when asyncio.wait_for is not suitable. For example:
149 >>> async with asyncio.timeout(10): # 10 seconds timeout
150 ... await long_running_task()
153 delay - value in seconds or None to disable timeout logic
155 long_running_task() is interrupted by raising asyncio.CancelledError,
156 the top-most affected timeout() context manager converts CancelledError
157 into TimeoutError.
158 """
159 loop = events.get_running_loop()
160 return Timeout(loop.time() + delay if delay is not None else None)
163def timeout_at(when: float | None) -> Timeout:
164 """Schedule the timeout at absolute time.
166 Like timeout() but argument gives absolute time in the same clock system
167 as loop.time().
169 Please note: it is not POSIX time but a time with
170 undefined starting base, e.g. the time of the system power on.
172 >>> async with asyncio.timeout_at(loop.time() + 10):
173 ... await long_running_task()
176 when - a deadline when timeout occurs or None to disable timeout logic
178 long_running_task() is interrupted by raising asyncio.CancelledError,
179 the top-most affected timeout() context manager converts CancelledError
180 into TimeoutError.
181 """
182 return Timeout(when)