Coverage for Lib/asyncio/timeouts.py: 98%

91 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-24 03:28 +0000

1import enum 

2 

3from types import TracebackType 

4 

5from . import events 

6from . import exceptions 

7from . import tasks 

8 

9 

10__all__ = ( 

11 "Timeout", 

12 "timeout", 

13 "timeout_at", 

14) 

15 

16 

17class _State(enum.Enum): 

18 CREATED = "created" 

19 ENTERED = "active" 

20 EXPIRING = "expiring" 

21 EXPIRED = "expired" 

22 EXITED = "finished" 

23 

24 

25class Timeout: 

26 """Asynchronous context manager for cancelling overdue coroutines. 

27 

28 Use `timeout()` or `timeout_at()` rather than instantiating this class 

29 directly. 

30 """ 

31 

32 def __init__(self, when: float | None) -> None: 

33 """Schedule a timeout that will trigger at a given loop time. 

34 

35 - If `when` is `None`, the timeout will never trigger. 

36 - If `when < loop.time()`, the timeout will trigger on the next 

37 iteration of the event loop. 

38 """ 

39 self._state = _State.CREATED 

40 

41 self._timeout_handler: events.TimerHandle | None = None 

42 self._task: tasks.Task | None = None 

43 self._when = when 

44 

45 def when(self) -> float | None: 

46 """Return the current deadline.""" 

47 return self._when 

48 

49 def reschedule(self, when: float | None) -> None: 

50 """Reschedule the timeout.""" 

51 if self._state is not _State.ENTERED: 

52 if self._state is _State.CREATED: 

53 raise RuntimeError("Timeout has not been entered") 

54 raise RuntimeError( 

55 f"Cannot change state of {self._state.value} Timeout", 

56 ) 

57 

58 self._when = when 

59 

60 if self._timeout_handler is not None: 

61 self._timeout_handler.cancel() 

62 

63 if when is None: 

64 self._timeout_handler = None 

65 else: 

66 loop = events.get_running_loop() 

67 if when <= loop.time(): 

68 self._timeout_handler = loop.call_soon(self._on_timeout) 

69 else: 

70 self._timeout_handler = loop.call_at(when, self._on_timeout) 

71 

72 def expired(self) -> bool: 

73 """Is timeout expired during execution?""" 

74 return self._state in (_State.EXPIRING, _State.EXPIRED) 

75 

76 def __repr__(self) -> str: 

77 info = [''] 

78 if self._state is _State.ENTERED: 

79 when = round(self._when, 3) if self._when is not None else None 

80 info.append(f"when={when}") 

81 info_str = ' '.join(info) 

82 return f"<Timeout [{self._state.value}]{info_str}>" 

83 

84 async def __aenter__(self) -> "Timeout": 

85 if self._state is not _State.CREATED: 

86 raise RuntimeError("Timeout has already been entered") 

87 task = tasks.current_task() 

88 if task is None: 

89 raise RuntimeError("Timeout should be used inside a task") 

90 self._state = _State.ENTERED 

91 self._task = task 

92 self._cancelling = self._task.cancelling() 

93 self.reschedule(self._when) 

94 return self 

95 

96 async def __aexit__( 

97 self, 

98 exc_type: type[BaseException] | None, 

99 exc_val: BaseException | None, 

100 exc_tb: TracebackType | None, 

101 ) -> bool | None: 

102 assert self._state in (_State.ENTERED, _State.EXPIRING) 

103 

104 if self._timeout_handler is not None: 

105 self._timeout_handler.cancel() 

106 self._timeout_handler = None 

107 

108 if self._state is _State.EXPIRING: 

109 self._state = _State.EXPIRED 

110 

111 if self._task.uncancel() <= self._cancelling and exc_type is not None: 

112 # Since there are no new cancel requests, we're 

113 # handling this. 

114 if issubclass(exc_type, exceptions.CancelledError): 

115 raise TimeoutError from exc_val 

116 elif exc_val is not None: 116 ↛ 124line 116 didn't jump to line 124 because the condition on line 116 was always true

117 self._insert_timeout_error(exc_val) 

118 if isinstance(exc_val, ExceptionGroup): 

119 for exc in exc_val.exceptions: 

120 self._insert_timeout_error(exc) 

121 elif self._state is _State.ENTERED: 121 ↛ 124line 121 didn't jump to line 124 because the condition on line 121 was always true

122 self._state = _State.EXITED 

123 

124 return None 

125 

126 def _on_timeout(self) -> None: 

127 assert self._state is _State.ENTERED 

128 self._task.cancel() 

129 self._state = _State.EXPIRING 

130 # drop the reference early 

131 self._timeout_handler = None 

132 

133 @staticmethod 

134 def _insert_timeout_error(exc_val: BaseException) -> None: 

135 while exc_val.__context__ is not None: 135 ↛ exitline 135 didn't return from function '_insert_timeout_error' because the condition on line 135 was always true

136 if isinstance(exc_val.__context__, exceptions.CancelledError): 

137 te = TimeoutError() 

138 te.__context__ = te.__cause__ = exc_val.__context__ 

139 exc_val.__context__ = te 

140 break 

141 exc_val = exc_val.__context__ 

142 

143 

144def timeout(delay: float | None) -> Timeout: 

145 """Timeout async context manager. 

146 

147 Useful in cases when you want to apply timeout logic around block 

148 of code or in cases when asyncio.wait_for is not suitable. For example: 

149 

150 >>> async with asyncio.timeout(10): # 10 seconds timeout 

151 ... await long_running_task() 

152 

153 

154 delay - value in seconds or None to disable timeout logic 

155 

156 long_running_task() is interrupted by raising asyncio.CancelledError, 

157 the top-most affected timeout() context manager converts CancelledError 

158 into TimeoutError. 

159 """ 

160 loop = events.get_running_loop() 

161 return Timeout(loop.time() + delay if delay is not None else None) 

162 

163 

164def timeout_at(when: float | None) -> Timeout: 

165 """Schedule the timeout at absolute time. 

166 

167 Like timeout() but argument gives absolute time in the same clock system 

168 as loop.time(). 

169 

170 Please note: it is not POSIX time but a time with 

171 undefined starting base, e.g. the time of the system power on. 

172 

173 >>> async with asyncio.timeout_at(loop.time() + 10): 

174 ... await long_running_task() 

175 

176 

177 when - a deadline when timeout occurs or None to disable timeout logic 

178 

179 long_running_task() is interrupted by raising asyncio.CancelledError, 

180 the top-most affected timeout() context manager converts CancelledError 

181 into TimeoutError. 

182 """ 

183 return Timeout(when)