Coverage for Lib/asyncio/timeouts.py: 98%

91 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-15 02:02 +0000

1import enum 

2 

3from types import TracebackType 

4 

5from . import events 

6from . import exceptions 

7from . import tasks 

8 

9 

10__all__ = ( 

11 "Timeout", 

12 "timeout", 

13 "timeout_at", 

14) 

15 

16 

17class _State(enum.Enum): 

18 CREATED = "created" 

19 ENTERED = "active" 

20 EXPIRING = "expiring" 

21 EXPIRED = "expired" 

22 EXITED = "finished" 

23 

24 

25class Timeout: 

26 """Asynchronous context manager for cancelling overdue coroutines. 

27 

28 Use `timeout()` or `timeout_at()` rather than instantiating this class directly. 

29 """ 

30 

31 def __init__(self, when: float | None) -> None: 

32 """Schedule a timeout that will trigger at a given loop time. 

33 

34 - If `when` is `None`, the timeout will never trigger. 

35 - If `when < loop.time()`, the timeout will trigger on the next 

36 iteration of the event loop. 

37 """ 

38 self._state = _State.CREATED 

39 

40 self._timeout_handler: events.TimerHandle | None = None 

41 self._task: tasks.Task | None = None 

42 self._when = when 

43 

44 def when(self) -> float | None: 

45 """Return the current deadline.""" 

46 return self._when 

47 

48 def reschedule(self, when: float | None) -> None: 

49 """Reschedule the timeout.""" 

50 if self._state is not _State.ENTERED: 

51 if self._state is _State.CREATED: 

52 raise RuntimeError("Timeout has not been entered") 

53 raise RuntimeError( 

54 f"Cannot change state of {self._state.value} Timeout", 

55 ) 

56 

57 self._when = when 

58 

59 if self._timeout_handler is not None: 

60 self._timeout_handler.cancel() 

61 

62 if when is None: 

63 self._timeout_handler = None 

64 else: 

65 loop = events.get_running_loop() 

66 if when <= loop.time(): 

67 self._timeout_handler = loop.call_soon(self._on_timeout) 

68 else: 

69 self._timeout_handler = loop.call_at(when, self._on_timeout) 

70 

71 def expired(self) -> bool: 

72 """Is timeout expired during execution?""" 

73 return self._state in (_State.EXPIRING, _State.EXPIRED) 

74 

75 def __repr__(self) -> str: 

76 info = [''] 

77 if self._state is _State.ENTERED: 

78 when = round(self._when, 3) if self._when is not None else None 

79 info.append(f"when={when}") 

80 info_str = ' '.join(info) 

81 return f"<Timeout [{self._state.value}]{info_str}>" 

82 

83 async def __aenter__(self) -> "Timeout": 

84 if self._state is not _State.CREATED: 

85 raise RuntimeError("Timeout has already been entered") 

86 task = tasks.current_task() 

87 if task is None: 

88 raise RuntimeError("Timeout should be used inside a task") 

89 self._state = _State.ENTERED 

90 self._task = task 

91 self._cancelling = self._task.cancelling() 

92 self.reschedule(self._when) 

93 return self 

94 

95 async def __aexit__( 

96 self, 

97 exc_type: type[BaseException] | None, 

98 exc_val: BaseException | None, 

99 exc_tb: TracebackType | None, 

100 ) -> bool | None: 

101 assert self._state in (_State.ENTERED, _State.EXPIRING) 

102 

103 if self._timeout_handler is not None: 

104 self._timeout_handler.cancel() 

105 self._timeout_handler = None 

106 

107 if self._state is _State.EXPIRING: 

108 self._state = _State.EXPIRED 

109 

110 if self._task.uncancel() <= self._cancelling and exc_type is not None: 

111 # Since there are no new cancel requests, we're 

112 # handling this. 

113 if issubclass(exc_type, exceptions.CancelledError): 

114 raise TimeoutError from exc_val 

115 elif exc_val is not None: 115 ↛ 123line 115 didn't jump to line 123 because the condition on line 115 was always true

116 self._insert_timeout_error(exc_val) 

117 if isinstance(exc_val, ExceptionGroup): 

118 for exc in exc_val.exceptions: 

119 self._insert_timeout_error(exc) 

120 elif self._state is _State.ENTERED: 120 ↛ 123line 120 didn't jump to line 123 because the condition on line 120 was always true

121 self._state = _State.EXITED 

122 

123 return None 

124 

125 def _on_timeout(self) -> None: 

126 assert self._state is _State.ENTERED 

127 self._task.cancel() 

128 self._state = _State.EXPIRING 

129 # drop the reference early 

130 self._timeout_handler = None 

131 

132 @staticmethod 

133 def _insert_timeout_error(exc_val: BaseException) -> None: 

134 while exc_val.__context__ is not None: 134 ↛ exitline 134 didn't return from function '_insert_timeout_error' because the condition on line 134 was always true

135 if isinstance(exc_val.__context__, exceptions.CancelledError): 

136 te = TimeoutError() 

137 te.__context__ = te.__cause__ = exc_val.__context__ 

138 exc_val.__context__ = te 

139 break 

140 exc_val = exc_val.__context__ 

141 

142 

143def timeout(delay: float | None) -> Timeout: 

144 """Timeout async context manager. 

145 

146 Useful in cases when you want to apply timeout logic around block 

147 of code or in cases when asyncio.wait_for is not suitable. For example: 

148 

149 >>> async with asyncio.timeout(10): # 10 seconds timeout 

150 ... await long_running_task() 

151 

152 

153 delay - value in seconds or None to disable timeout logic 

154 

155 long_running_task() is interrupted by raising asyncio.CancelledError, 

156 the top-most affected timeout() context manager converts CancelledError 

157 into TimeoutError. 

158 """ 

159 loop = events.get_running_loop() 

160 return Timeout(loop.time() + delay if delay is not None else None) 

161 

162 

163def timeout_at(when: float | None) -> Timeout: 

164 """Schedule the timeout at absolute time. 

165 

166 Like timeout() but argument gives absolute time in the same clock system 

167 as loop.time(). 

168 

169 Please note: it is not POSIX time but a time with 

170 undefined starting base, e.g. the time of the system power on. 

171 

172 >>> async with asyncio.timeout_at(loop.time() + 10): 

173 ... await long_running_task() 

174 

175 

176 when - a deadline when timeout occurs or None to disable timeout logic 

177 

178 long_running_task() is interrupted by raising asyncio.CancelledError, 

179 the top-most affected timeout() context manager converts CancelledError 

180 into TimeoutError. 

181 """ 

182 return Timeout(when)