Coverage for Lib/asyncio/queues.py: 98%

156 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-15 02:02 +0000

1__all__ = ( 

2 'Queue', 

3 'PriorityQueue', 

4 'LifoQueue', 

5 'QueueFull', 

6 'QueueEmpty', 

7 'QueueShutDown', 

8) 

9 

10import collections 

11import heapq 

12from types import GenericAlias 

13 

14from . import locks 

15from . import mixins 

16 

17 

18class QueueEmpty(Exception): 

19 """Raised when Queue.get_nowait() is called on an empty Queue.""" 

20 pass 

21 

22 

23class QueueFull(Exception): 

24 """Raised when the Queue.put_nowait() method is called on a full Queue.""" 

25 pass 

26 

27 

28class QueueShutDown(Exception): 

29 """Raised when putting on to or getting from a shut-down Queue.""" 

30 pass 

31 

32 

33class Queue(mixins._LoopBoundMixin): 

34 """A queue, useful for coordinating producer and consumer coroutines. 

35 

36 If maxsize is less than or equal to zero, the queue size is infinite. If it 

37 is an integer greater than 0, then "await put()" will block when the 

38 queue reaches maxsize, until an item is removed by get(). 

39 

40 Unlike the standard library Queue, you can reliably know this Queue's size 

41 with qsize(), since your single-threaded asyncio application won't be 

42 interrupted between calling qsize() and doing an operation on the Queue. 

43 """ 

44 

45 def __init__(self, maxsize=0): 

46 self._maxsize = maxsize 

47 

48 # Futures. 

49 self._getters = collections.deque() 

50 # Futures. 

51 self._putters = collections.deque() 

52 self._unfinished_tasks = 0 

53 self._finished = locks.Event() 

54 self._finished.set() 

55 self._init(maxsize) 

56 self._is_shutdown = False 

57 

58 # These three are overridable in subclasses. 

59 

60 def _init(self, maxsize): 

61 self._queue = collections.deque() 

62 

63 def _get(self): 

64 return self._queue.popleft() 

65 

66 def _put(self, item): 

67 self._queue.append(item) 

68 

69 # End of the overridable methods. 

70 

71 def _wakeup_next(self, waiters): 

72 # Wake up the next waiter (if any) that isn't cancelled. 

73 while waiters: 

74 waiter = waiters.popleft() 

75 if not waiter.done(): 75 ↛ 73line 75 didn't jump to line 73 because the condition on line 75 was always true

76 waiter.set_result(None) 

77 break 

78 

79 def __repr__(self): 

80 return f'<{type(self).__name__} at {id(self):#x} {self._format()}>' 

81 

82 def __str__(self): 

83 return f'<{type(self).__name__} {self._format()}>' 

84 

85 __class_getitem__ = classmethod(GenericAlias) 

86 

87 def _format(self): 

88 result = f'maxsize={self._maxsize!r}' 

89 if getattr(self, '_queue', None): 

90 result += f' _queue={list(self._queue)!r}' 

91 if self._getters: 

92 result += f' _getters[{len(self._getters)}]' 

93 if self._putters: 

94 result += f' _putters[{len(self._putters)}]' 

95 if self._unfinished_tasks: 

96 result += f' tasks={self._unfinished_tasks}' 

97 if self._is_shutdown: 

98 result += ' shutdown' 

99 return result 

100 

101 def qsize(self): 

102 """Number of items in the queue.""" 

103 return len(self._queue) 

104 

105 @property 

106 def maxsize(self): 

107 """Number of items allowed in the queue.""" 

108 return self._maxsize 

109 

110 def empty(self): 

111 """Return True if the queue is empty, False otherwise.""" 

112 return not self._queue 

113 

114 def full(self): 

115 """Return True if there are maxsize items in the queue. 

116 

117 Note: if the Queue was initialized with maxsize=0 (the default), 

118 then full() is never True. 

119 """ 

120 if self._maxsize <= 0: 

121 return False 

122 else: 

123 return self.qsize() >= self._maxsize 

124 

125 async def put(self, item): 

126 """Put an item into the queue. 

127 

128 Put an item into the queue. If the queue is full, wait until a free 

129 slot is available before adding item. 

130 

131 Raises QueueShutDown if the queue has been shut down. 

132 """ 

133 while self.full(): 

134 if self._is_shutdown: 

135 raise QueueShutDown 

136 putter = self._get_loop().create_future() 

137 self._putters.append(putter) 

138 try: 

139 await putter 

140 except: 

141 putter.cancel() # Just in case putter is not done yet. 

142 try: 

143 # Clean self._putters from canceled putters. 

144 self._putters.remove(putter) 

145 except ValueError: 

146 # The putter could be removed from self._putters by a 

147 # previous get_nowait call or a shutdown call. 

148 pass 

149 if not self.full() and not putter.cancelled(): 

150 # We were woken up by get_nowait(), but can't take 

151 # the call. Wake up the next in line. 

152 self._wakeup_next(self._putters) 

153 raise 

154 return self.put_nowait(item) 

155 

156 def put_nowait(self, item): 

157 """Put an item into the queue without blocking. 

158 

159 If no free slot is immediately available, raise QueueFull. 

160 

161 Raises QueueShutDown if the queue has been shut down. 

162 """ 

163 if self._is_shutdown: 

164 raise QueueShutDown 

165 if self.full(): 

166 raise QueueFull 

167 self._put(item) 

168 self._unfinished_tasks += 1 

169 self._finished.clear() 

170 self._wakeup_next(self._getters) 

171 

172 async def get(self): 

173 """Remove and return an item from the queue. 

174 

175 If queue is empty, wait until an item is available. 

176 

177 Raises QueueShutDown if the queue has been shut down and is empty, or 

178 if the queue has been shut down immediately. 

179 """ 

180 while self.empty(): 

181 if self._is_shutdown and self.empty(): 

182 raise QueueShutDown 

183 getter = self._get_loop().create_future() 

184 self._getters.append(getter) 

185 try: 

186 await getter 

187 except: 

188 getter.cancel() # Just in case getter is not done yet. 

189 try: 

190 # Clean self._getters from canceled getters. 

191 self._getters.remove(getter) 

192 except ValueError: 

193 # The getter could be removed from self._getters by a 

194 # previous put_nowait call, or a shutdown call. 

195 pass 

196 if not self.empty() and not getter.cancelled(): 

197 # We were woken up by put_nowait(), but can't take 

198 # the call. Wake up the next in line. 

199 self._wakeup_next(self._getters) 

200 raise 

201 return self.get_nowait() 

202 

203 def get_nowait(self): 

204 """Remove and return an item from the queue. 

205 

206 Return an item if one is immediately available, else raise QueueEmpty. 

207 

208 Raises QueueShutDown if the queue has been shut down and is empty, or 

209 if the queue has been shut down immediately. 

210 """ 

211 if self.empty(): 

212 if self._is_shutdown: 

213 raise QueueShutDown 

214 raise QueueEmpty 

215 item = self._get() 

216 self._wakeup_next(self._putters) 

217 return item 

218 

219 def task_done(self): 

220 """Indicate that a formerly enqueued task is complete. 

221 

222 Used by queue consumers. For each get() used to fetch a task, 

223 a subsequent call to task_done() tells the queue that the processing 

224 on the task is complete. 

225 

226 If a join() is currently blocking, it will resume when all items have 

227 been processed (meaning that a task_done() call was received for every 

228 item that had been put() into the queue). 

229 

230 shutdown(immediate=True) calls task_done() for each remaining item in 

231 the queue. 

232 

233 Raises ValueError if called more times than there were items placed in 

234 the queue. 

235 """ 

236 if self._unfinished_tasks <= 0: 

237 raise ValueError('task_done() called too many times') 

238 self._unfinished_tasks -= 1 

239 if self._unfinished_tasks == 0: 

240 self._finished.set() 

241 

242 async def join(self): 

243 """Block until all items in the queue have been gotten and processed. 

244 

245 The count of unfinished tasks goes up whenever an item is added to the 

246 queue. The count goes down whenever a consumer calls task_done() to 

247 indicate that the item was retrieved and all work on it is complete. 

248 When the count of unfinished tasks drops to zero, join() unblocks. 

249 """ 

250 if self._unfinished_tasks > 0: 

251 await self._finished.wait() 

252 

253 def shutdown(self, immediate=False): 

254 """Shut-down the queue, making queue gets and puts raise QueueShutDown. 

255 

256 By default, gets will only raise once the queue is empty. Set 

257 'immediate' to True to make gets raise immediately instead. 

258 

259 All blocked callers of put() and get() will be unblocked. If 

260 'immediate', a task is marked as done for each item remaining in 

261 the queue, which may unblock callers of join(). 

262 """ 

263 self._is_shutdown = True 

264 if immediate: 

265 while not self.empty(): 

266 self._get() 

267 if self._unfinished_tasks > 0: 267 ↛ 265line 267 didn't jump to line 265 because the condition on line 267 was always true

268 self._unfinished_tasks -= 1 

269 if self._unfinished_tasks == 0: 

270 self._finished.set() 

271 # All getters need to re-check queue-empty to raise ShutDown 

272 while self._getters: 

273 getter = self._getters.popleft() 

274 if not getter.done(): 274 ↛ 272line 274 didn't jump to line 272 because the condition on line 274 was always true

275 getter.set_result(None) 

276 while self._putters: 

277 putter = self._putters.popleft() 

278 if not putter.done(): 278 ↛ 276line 278 didn't jump to line 276 because the condition on line 278 was always true

279 putter.set_result(None) 

280 

281 

282class PriorityQueue(Queue): 

283 """A subclass of Queue; retrieves entries in priority order (lowest first). 

284 

285 Entries are typically tuples of the form: (priority number, data). 

286 """ 

287 

288 def _init(self, maxsize): 

289 self._queue = [] 

290 

291 def _put(self, item, heappush=heapq.heappush): 

292 heappush(self._queue, item) 

293 

294 def _get(self, heappop=heapq.heappop): 

295 return heappop(self._queue) 

296 

297 

298class LifoQueue(Queue): 

299 """A subclass of Queue that retrieves most recently added entries first.""" 

300 

301 def _init(self, maxsize): 

302 self._queue = [] 

303 

304 def _put(self, item): 

305 self._queue.append(item) 

306 

307 def _get(self): 

308 return self._queue.pop()