Coverage for Lib/asyncio/queues.py: 98%

156 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-23 01:21 +0000

1__all__ = ( 

2 'Queue', 

3 'PriorityQueue', 

4 'LifoQueue', 

5 'QueueFull', 

6 'QueueEmpty', 

7 'QueueShutDown', 

8) 

9 

10import collections 

11import heapq 

12from types import GenericAlias 

13 

14from . import locks 

15from . import mixins 

16 

17 

18class QueueEmpty(Exception): 

19 """Raised when Queue.get_nowait() is called on an empty Queue.""" 

20 pass 

21 

22 

23class QueueFull(Exception): 

24 """Raised when the Queue.put_nowait() method is called on a full Queue.""" 

25 pass 

26 

27 

28class QueueShutDown(Exception): 

29 """Raised when putting on to or getting from a shut-down Queue.""" 

30 pass 

31 

32 

33class Queue(mixins._LoopBoundMixin): 

34 """A queue, useful for coordinating producer and consumer coroutines. 

35 

36 If maxsize is less than or equal to zero, the queue size is infinite. If it 

37 is an integer greater than 0, then "await put()" will block when the 

38 queue reaches maxsize, until an item is removed by get(). 

39 

40 Unlike the standard library Queue, you can reliably know this Queue's size 

41 with qsize(), since your single-threaded asyncio application won't be 

42 interrupted between calling qsize() and doing an operation on the Queue. 

43 """ 

44 

45 def __init__(self, maxsize=0): 

46 self._maxsize = maxsize 

47 

48 # Futures. 

49 self._getters = collections.deque() 

50 # Futures. 

51 self._putters = collections.deque() 

52 self._unfinished_tasks = 0 

53 self._finished = locks.Event() 

54 self._finished.set() 

55 self._init(maxsize) 

56 self._is_shutdown = False 

57 

58 # These three are overridable in subclasses. 

59 

60 def _init(self, maxsize): 

61 self._queue = collections.deque() 

62 

63 def _get(self): 

64 return self._queue.popleft() 

65 

66 def _put(self, item): 

67 self._queue.append(item) 

68 

69 # End of the overridable methods. 

70 

71 def _wakeup_next(self, waiters): 

72 # Wake up the next waiter (if any) that isn't cancelled. 

73 while waiters: 

74 waiter = waiters.popleft() 

75 if not waiter.done(): 75 ↛ 73line 75 didn't jump to line 73 because the condition on line 75 was always true

76 waiter.set_result(None) 

77 break 

78 

79 def __repr__(self): 

80 return f'<{type(self).__name__} at {id(self):#x} {self._format()}>' 

81 

82 def __str__(self): 

83 return f'<{type(self).__name__} {self._format()}>' 

84 

85 __class_getitem__ = classmethod(GenericAlias) 

86 

87 def _format(self): 

88 result = f'maxsize={self._maxsize!r}' 

89 if getattr(self, '_queue', None): 

90 result += f' _queue={list(self._queue)!r}' 

91 if self._getters: 

92 result += f' _getters[{len(self._getters)}]' 

93 if self._putters: 

94 result += f' _putters[{len(self._putters)}]' 

95 if self._unfinished_tasks: 

96 result += f' tasks={self._unfinished_tasks}' 

97 if self._is_shutdown: 

98 result += ' shutdown' 

99 return result 

100 

101 def qsize(self): 

102 """Number of items in the queue.""" 

103 return len(self._queue) 

104 

105 @property 

106 def maxsize(self): 

107 """Number of items allowed in the queue.""" 

108 return self._maxsize 

109 

110 def empty(self): 

111 """Return True if the queue is empty, False otherwise.""" 

112 return not self._queue 

113 

114 def full(self): 

115 """Return True if there are maxsize items in the queue. 

116 

117 Note: if the Queue was initialized with maxsize=0 (the default), 

118 then full() is never True. 

119 """ 

120 if self._maxsize <= 0: 

121 return False 

122 else: 

123 return self.qsize() >= self._maxsize 

124 

125 async def put(self, item): 

126 """Put an item into the queue. 

127 

128 Put an item into the queue. If the queue is full, wait until a free 

129 slot is available before adding item. 

130 

131 Raises QueueShutDown if the queue has been shut down. 

132 """ 

133 while self.full(): 

134 if self._is_shutdown: 

135 raise QueueShutDown 

136 putter = self._get_loop().create_future() 

137 self._putters.append(putter) 

138 try: 

139 await putter 

140 except: 

141 putter.cancel() # Just in case putter is not done yet. 

142 try: 

143 # Clean self._putters from canceled putters. 

144 self._putters.remove(putter) 

145 except ValueError: 

146 # The putter could be removed from self._putters by a 

147 # previous get_nowait call or a shutdown call. 

148 pass 

149 if not self.full() and not putter.cancelled(): 

150 # We were woken up by get_nowait(), but can't take 

151 # the call. Wake up the next in line. 

152 self._wakeup_next(self._putters) 

153 raise 

154 return self.put_nowait(item) 

155 

156 def put_nowait(self, item): 

157 """Put an item into the queue without blocking. 

158 

159 If no free slot is immediately available, raise QueueFull. 

160 

161 Raises QueueShutDown if the queue has been shut down. 

162 """ 

163 if self._is_shutdown: 

164 raise QueueShutDown 

165 if self.full(): 

166 raise QueueFull 

167 self._put(item) 

168 self._unfinished_tasks += 1 

169 self._finished.clear() 

170 self._wakeup_next(self._getters) 

171 

172 async def get(self): 

173 """Remove and return an item from the queue. 

174 

175 If queue is empty, wait until an item is available. 

176 

177 Raises QueueShutDown if the queue has been shut down and is empty, or 

178 if the queue has been shut down immediately. 

179 """ 

180 while self.empty(): 

181 if self._is_shutdown and self.empty(): 

182 raise QueueShutDown 

183 getter = self._get_loop().create_future() 

184 self._getters.append(getter) 

185 try: 

186 await getter 

187 except: 

188 getter.cancel() # Just in case getter is not done yet. 

189 try: 

190 # Clean self._getters from canceled getters. 

191 self._getters.remove(getter) 

192 except ValueError: 

193 # The getter could be removed from self._getters by a 

194 # previous put_nowait call, or a shutdown call. 

195 pass 

196 if not self.empty() and not getter.cancelled(): 

197 # We were woken up by put_nowait(), but can't take 

198 # the call. Wake up the next in line. 

199 self._wakeup_next(self._getters) 

200 raise 

201 return self.get_nowait() 

202 

203 def get_nowait(self): 

204 """Remove and return an item from the queue. 

205 

206 Return an item if one is immediately available, else raise QueueEmpty. 

207 

208 Raises QueueShutDown if the queue has been shut down and is empty, or 

209 if the queue has been shut down immediately. 

210 """ 

211 if self.empty(): 

212 if self._is_shutdown: 

213 raise QueueShutDown 

214 raise QueueEmpty 

215 item = self._get() 

216 self._wakeup_next(self._putters) 

217 return item 

218 

219 def task_done(self): 

220 """Indicate that a formerly enqueued task is complete. 

221 

222 Used by queue consumers. For each get() used to fetch a task, 

223 a subsequent call to task_done() tells the queue that the processing 

224 on the task is complete. 

225 

226 If a join() is currently blocking, it will resume when all items have 

227 been processed (meaning that a task_done() call was received for every 

228 item that had been put() into the queue). 

229 

230 Raises ValueError if called more times than there were items placed in 

231 the queue. 

232 """ 

233 if self._unfinished_tasks <= 0: 

234 raise ValueError('task_done() called too many times') 

235 self._unfinished_tasks -= 1 

236 if self._unfinished_tasks == 0: 

237 self._finished.set() 

238 

239 async def join(self): 

240 """Block until all items in the queue have been gotten and processed. 

241 

242 The count of unfinished tasks goes up whenever an item is added to the 

243 queue. The count goes down whenever a consumer calls task_done() to 

244 indicate that the item was retrieved and all work on it is complete. 

245 When the count of unfinished tasks drops to zero, join() unblocks. 

246 """ 

247 if self._unfinished_tasks > 0: 

248 await self._finished.wait() 

249 

250 def shutdown(self, immediate=False): 

251 """Shut-down the queue, making queue gets and puts raise QueueShutDown. 

252 

253 By default, gets will only raise once the queue is empty. Set 

254 'immediate' to True to make gets raise immediately instead. 

255 

256 All blocked callers of put() and get() will be unblocked. 

257 

258 If 'immediate', the queue is drained and unfinished tasks 

259 is reduced by the number of drained tasks. If unfinished tasks 

260 is reduced to zero, callers of Queue.join are unblocked. 

261 """ 

262 self._is_shutdown = True 

263 if immediate: 

264 while not self.empty(): 

265 self._get() 

266 if self._unfinished_tasks > 0: 266 ↛ 264line 266 didn't jump to line 264 because the condition on line 266 was always true

267 self._unfinished_tasks -= 1 

268 if self._unfinished_tasks == 0: 

269 self._finished.set() 

270 # All getters need to re-check queue-empty to raise ShutDown 

271 while self._getters: 

272 getter = self._getters.popleft() 

273 if not getter.done(): 273 ↛ 271line 273 didn't jump to line 271 because the condition on line 273 was always true

274 getter.set_result(None) 

275 while self._putters: 

276 putter = self._putters.popleft() 

277 if not putter.done(): 277 ↛ 275line 277 didn't jump to line 275 because the condition on line 277 was always true

278 putter.set_result(None) 

279 

280 

281class PriorityQueue(Queue): 

282 """A subclass of Queue; retrieves entries in priority order (lowest first). 

283 

284 Entries are typically tuples of the form: (priority number, data). 

285 """ 

286 

287 def _init(self, maxsize): 

288 self._queue = [] 

289 

290 def _put(self, item, heappush=heapq.heappush): 

291 heappush(self._queue, item) 

292 

293 def _get(self, heappop=heapq.heappop): 

294 return heappop(self._queue) 

295 

296 

297class LifoQueue(Queue): 

298 """A subclass of Queue that retrieves most recently added entries first.""" 

299 

300 def _init(self, maxsize): 

301 self._queue = [] 

302 

303 def _put(self, item): 

304 self._queue.append(item) 

305 

306 def _get(self): 

307 return self._queue.pop()