Coverage for Lib/asyncio/queues.py: 98%
156 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-15 02:02 +0000
1__all__ = (
2 'Queue',
3 'PriorityQueue',
4 'LifoQueue',
5 'QueueFull',
6 'QueueEmpty',
7 'QueueShutDown',
8)
10import collections
11import heapq
12from types import GenericAlias
14from . import locks
15from . import mixins
18class QueueEmpty(Exception):
19 """Raised when Queue.get_nowait() is called on an empty Queue."""
20 pass
23class QueueFull(Exception):
24 """Raised when the Queue.put_nowait() method is called on a full Queue."""
25 pass
28class QueueShutDown(Exception):
29 """Raised when putting on to or getting from a shut-down Queue."""
30 pass
33class Queue(mixins._LoopBoundMixin):
34 """A queue, useful for coordinating producer and consumer coroutines.
36 If maxsize is less than or equal to zero, the queue size is infinite. If it
37 is an integer greater than 0, then "await put()" will block when the
38 queue reaches maxsize, until an item is removed by get().
40 Unlike the standard library Queue, you can reliably know this Queue's size
41 with qsize(), since your single-threaded asyncio application won't be
42 interrupted between calling qsize() and doing an operation on the Queue.
43 """
45 def __init__(self, maxsize=0):
46 self._maxsize = maxsize
48 # Futures.
49 self._getters = collections.deque()
50 # Futures.
51 self._putters = collections.deque()
52 self._unfinished_tasks = 0
53 self._finished = locks.Event()
54 self._finished.set()
55 self._init(maxsize)
56 self._is_shutdown = False
58 # These three are overridable in subclasses.
60 def _init(self, maxsize):
61 self._queue = collections.deque()
63 def _get(self):
64 return self._queue.popleft()
66 def _put(self, item):
67 self._queue.append(item)
69 # End of the overridable methods.
71 def _wakeup_next(self, waiters):
72 # Wake up the next waiter (if any) that isn't cancelled.
73 while waiters:
74 waiter = waiters.popleft()
75 if not waiter.done(): 75 ↛ 73line 75 didn't jump to line 73 because the condition on line 75 was always true
76 waiter.set_result(None)
77 break
79 def __repr__(self):
80 return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
82 def __str__(self):
83 return f'<{type(self).__name__} {self._format()}>'
85 __class_getitem__ = classmethod(GenericAlias)
87 def _format(self):
88 result = f'maxsize={self._maxsize!r}'
89 if getattr(self, '_queue', None):
90 result += f' _queue={list(self._queue)!r}'
91 if self._getters:
92 result += f' _getters[{len(self._getters)}]'
93 if self._putters:
94 result += f' _putters[{len(self._putters)}]'
95 if self._unfinished_tasks:
96 result += f' tasks={self._unfinished_tasks}'
97 if self._is_shutdown:
98 result += ' shutdown'
99 return result
101 def qsize(self):
102 """Number of items in the queue."""
103 return len(self._queue)
105 @property
106 def maxsize(self):
107 """Number of items allowed in the queue."""
108 return self._maxsize
110 def empty(self):
111 """Return True if the queue is empty, False otherwise."""
112 return not self._queue
114 def full(self):
115 """Return True if there are maxsize items in the queue.
117 Note: if the Queue was initialized with maxsize=0 (the default),
118 then full() is never True.
119 """
120 if self._maxsize <= 0:
121 return False
122 else:
123 return self.qsize() >= self._maxsize
125 async def put(self, item):
126 """Put an item into the queue.
128 Put an item into the queue. If the queue is full, wait until a free
129 slot is available before adding item.
131 Raises QueueShutDown if the queue has been shut down.
132 """
133 while self.full():
134 if self._is_shutdown:
135 raise QueueShutDown
136 putter = self._get_loop().create_future()
137 self._putters.append(putter)
138 try:
139 await putter
140 except:
141 putter.cancel() # Just in case putter is not done yet.
142 try:
143 # Clean self._putters from canceled putters.
144 self._putters.remove(putter)
145 except ValueError:
146 # The putter could be removed from self._putters by a
147 # previous get_nowait call or a shutdown call.
148 pass
149 if not self.full() and not putter.cancelled():
150 # We were woken up by get_nowait(), but can't take
151 # the call. Wake up the next in line.
152 self._wakeup_next(self._putters)
153 raise
154 return self.put_nowait(item)
156 def put_nowait(self, item):
157 """Put an item into the queue without blocking.
159 If no free slot is immediately available, raise QueueFull.
161 Raises QueueShutDown if the queue has been shut down.
162 """
163 if self._is_shutdown:
164 raise QueueShutDown
165 if self.full():
166 raise QueueFull
167 self._put(item)
168 self._unfinished_tasks += 1
169 self._finished.clear()
170 self._wakeup_next(self._getters)
172 async def get(self):
173 """Remove and return an item from the queue.
175 If queue is empty, wait until an item is available.
177 Raises QueueShutDown if the queue has been shut down and is empty, or
178 if the queue has been shut down immediately.
179 """
180 while self.empty():
181 if self._is_shutdown and self.empty():
182 raise QueueShutDown
183 getter = self._get_loop().create_future()
184 self._getters.append(getter)
185 try:
186 await getter
187 except:
188 getter.cancel() # Just in case getter is not done yet.
189 try:
190 # Clean self._getters from canceled getters.
191 self._getters.remove(getter)
192 except ValueError:
193 # The getter could be removed from self._getters by a
194 # previous put_nowait call, or a shutdown call.
195 pass
196 if not self.empty() and not getter.cancelled():
197 # We were woken up by put_nowait(), but can't take
198 # the call. Wake up the next in line.
199 self._wakeup_next(self._getters)
200 raise
201 return self.get_nowait()
203 def get_nowait(self):
204 """Remove and return an item from the queue.
206 Return an item if one is immediately available, else raise QueueEmpty.
208 Raises QueueShutDown if the queue has been shut down and is empty, or
209 if the queue has been shut down immediately.
210 """
211 if self.empty():
212 if self._is_shutdown:
213 raise QueueShutDown
214 raise QueueEmpty
215 item = self._get()
216 self._wakeup_next(self._putters)
217 return item
219 def task_done(self):
220 """Indicate that a formerly enqueued task is complete.
222 Used by queue consumers. For each get() used to fetch a task,
223 a subsequent call to task_done() tells the queue that the processing
224 on the task is complete.
226 If a join() is currently blocking, it will resume when all items have
227 been processed (meaning that a task_done() call was received for every
228 item that had been put() into the queue).
230 shutdown(immediate=True) calls task_done() for each remaining item in
231 the queue.
233 Raises ValueError if called more times than there were items placed in
234 the queue.
235 """
236 if self._unfinished_tasks <= 0:
237 raise ValueError('task_done() called too many times')
238 self._unfinished_tasks -= 1
239 if self._unfinished_tasks == 0:
240 self._finished.set()
242 async def join(self):
243 """Block until all items in the queue have been gotten and processed.
245 The count of unfinished tasks goes up whenever an item is added to the
246 queue. The count goes down whenever a consumer calls task_done() to
247 indicate that the item was retrieved and all work on it is complete.
248 When the count of unfinished tasks drops to zero, join() unblocks.
249 """
250 if self._unfinished_tasks > 0:
251 await self._finished.wait()
253 def shutdown(self, immediate=False):
254 """Shut-down the queue, making queue gets and puts raise QueueShutDown.
256 By default, gets will only raise once the queue is empty. Set
257 'immediate' to True to make gets raise immediately instead.
259 All blocked callers of put() and get() will be unblocked. If
260 'immediate', a task is marked as done for each item remaining in
261 the queue, which may unblock callers of join().
262 """
263 self._is_shutdown = True
264 if immediate:
265 while not self.empty():
266 self._get()
267 if self._unfinished_tasks > 0: 267 ↛ 265line 267 didn't jump to line 265 because the condition on line 267 was always true
268 self._unfinished_tasks -= 1
269 if self._unfinished_tasks == 0:
270 self._finished.set()
271 # All getters need to re-check queue-empty to raise ShutDown
272 while self._getters:
273 getter = self._getters.popleft()
274 if not getter.done(): 274 ↛ 272line 274 didn't jump to line 272 because the condition on line 274 was always true
275 getter.set_result(None)
276 while self._putters:
277 putter = self._putters.popleft()
278 if not putter.done(): 278 ↛ 276line 278 didn't jump to line 276 because the condition on line 278 was always true
279 putter.set_result(None)
282class PriorityQueue(Queue):
283 """A subclass of Queue; retrieves entries in priority order (lowest first).
285 Entries are typically tuples of the form: (priority number, data).
286 """
288 def _init(self, maxsize):
289 self._queue = []
291 def _put(self, item, heappush=heapq.heappush):
292 heappush(self._queue, item)
294 def _get(self, heappop=heapq.heappop):
295 return heappop(self._queue)
298class LifoQueue(Queue):
299 """A subclass of Queue that retrieves most recently added entries first."""
301 def _init(self, maxsize):
302 self._queue = []
304 def _put(self, item):
305 self._queue.append(item)
307 def _get(self):
308 return self._queue.pop()