| 1 | """A multi-producer, multi-consumer queue."""
|
|---|
| 2 |
|
|---|
| 3 | from time import time as _time
|
|---|
| 4 | from collections import deque
|
|---|
| 5 |
|
|---|
| 6 | __all__ = ['Empty', 'Full', 'Queue']
|
|---|
| 7 |
|
|---|
| 8 | class Empty(Exception):
|
|---|
| 9 | "Exception raised by Queue.get(block=0)/get_nowait()."
|
|---|
| 10 | pass
|
|---|
| 11 |
|
|---|
| 12 | class Full(Exception):
|
|---|
| 13 | "Exception raised by Queue.put(block=0)/put_nowait()."
|
|---|
| 14 | pass
|
|---|
| 15 |
|
|---|
| 16 | class Queue:
|
|---|
| 17 | """Create a queue object with a given maximum size.
|
|---|
| 18 |
|
|---|
| 19 | If maxsize is <= 0, the queue size is infinite.
|
|---|
| 20 | """
|
|---|
| 21 | def __init__(self, maxsize=0):
|
|---|
| 22 | try:
|
|---|
| 23 | import threading
|
|---|
| 24 | except ImportError:
|
|---|
| 25 | import dummy_threading as threading
|
|---|
| 26 | self._init(maxsize)
|
|---|
| 27 | # mutex must be held whenever the queue is mutating. All methods
|
|---|
| 28 | # that acquire mutex must release it before returning. mutex
|
|---|
| 29 | # is shared between the two conditions, so acquiring and
|
|---|
| 30 | # releasing the conditions also acquires and releases mutex.
|
|---|
| 31 | self.mutex = threading.Lock()
|
|---|
| 32 | # Notify not_empty whenever an item is added to the queue; a
|
|---|
| 33 | # thread waiting to get is notified then.
|
|---|
| 34 | self.not_empty = threading.Condition(self.mutex)
|
|---|
| 35 | # Notify not_full whenever an item is removed from the queue;
|
|---|
| 36 | # a thread waiting to put is notified then.
|
|---|
| 37 | self.not_full = threading.Condition(self.mutex)
|
|---|
| 38 | # Notify all_tasks_done whenever the number of unfinished tasks
|
|---|
| 39 | # drops to zero; thread waiting to join() is notified to resume
|
|---|
| 40 | self.all_tasks_done = threading.Condition(self.mutex)
|
|---|
| 41 | self.unfinished_tasks = 0
|
|---|
| 42 |
|
|---|
| 43 | def task_done(self):
|
|---|
| 44 | """Indicate that a formerly enqueued task is complete.
|
|---|
| 45 |
|
|---|
| 46 | Used by Queue consumer threads. For each get() used to fetch a task,
|
|---|
| 47 | a subsequent call to task_done() tells the queue that the processing
|
|---|
| 48 | on the task is complete.
|
|---|
| 49 |
|
|---|
| 50 | If a join() is currently blocking, it will resume when all items
|
|---|
| 51 | have been processed (meaning that a task_done() call was received
|
|---|
| 52 | for every item that had been put() into the queue).
|
|---|
| 53 |
|
|---|
| 54 | Raises a ValueError if called more times than there were items
|
|---|
| 55 | placed in the queue.
|
|---|
| 56 | """
|
|---|
| 57 | self.all_tasks_done.acquire()
|
|---|
| 58 | try:
|
|---|
| 59 | unfinished = self.unfinished_tasks - 1
|
|---|
| 60 | if unfinished <= 0:
|
|---|
| 61 | if unfinished < 0:
|
|---|
| 62 | raise ValueError('task_done() called too many times')
|
|---|
| 63 | self.all_tasks_done.notifyAll()
|
|---|
| 64 | self.unfinished_tasks = unfinished
|
|---|
| 65 | finally:
|
|---|
| 66 | self.all_tasks_done.release()
|
|---|
| 67 |
|
|---|
| 68 | def join(self):
|
|---|
| 69 | """Blocks until all items in the Queue have been gotten and processed.
|
|---|
| 70 |
|
|---|
| 71 | The count of unfinished tasks goes up whenever an item is added to the
|
|---|
| 72 | queue. The count goes down whenever a consumer thread calls task_done()
|
|---|
| 73 | to indicate the item was retrieved and all work on it is complete.
|
|---|
| 74 |
|
|---|
| 75 | When the count of unfinished tasks drops to zero, join() unblocks.
|
|---|
| 76 | """
|
|---|
| 77 | self.all_tasks_done.acquire()
|
|---|
| 78 | try:
|
|---|
| 79 | while self.unfinished_tasks:
|
|---|
| 80 | self.all_tasks_done.wait()
|
|---|
| 81 | finally:
|
|---|
| 82 | self.all_tasks_done.release()
|
|---|
| 83 |
|
|---|
| 84 | def qsize(self):
|
|---|
| 85 | """Return the approximate size of the queue (not reliable!)."""
|
|---|
| 86 | self.mutex.acquire()
|
|---|
| 87 | n = self._qsize()
|
|---|
| 88 | self.mutex.release()
|
|---|
| 89 | return n
|
|---|
| 90 |
|
|---|
| 91 | def empty(self):
|
|---|
| 92 | """Return True if the queue is empty, False otherwise (not reliable!)."""
|
|---|
| 93 | self.mutex.acquire()
|
|---|
| 94 | n = self._empty()
|
|---|
| 95 | self.mutex.release()
|
|---|
| 96 | return n
|
|---|
| 97 |
|
|---|
| 98 | def full(self):
|
|---|
| 99 | """Return True if the queue is full, False otherwise (not reliable!)."""
|
|---|
| 100 | self.mutex.acquire()
|
|---|
| 101 | n = self._full()
|
|---|
| 102 | self.mutex.release()
|
|---|
| 103 | return n
|
|---|
| 104 |
|
|---|
| 105 | def put(self, item, block=True, timeout=None):
|
|---|
| 106 | """Put an item into the queue.
|
|---|
| 107 |
|
|---|
| 108 | If optional args 'block' is true and 'timeout' is None (the default),
|
|---|
| 109 | block if necessary until a free slot is available. If 'timeout' is
|
|---|
| 110 | a positive number, it blocks at most 'timeout' seconds and raises
|
|---|
| 111 | the Full exception if no free slot was available within that time.
|
|---|
| 112 | Otherwise ('block' is false), put an item on the queue if a free slot
|
|---|
| 113 | is immediately available, else raise the Full exception ('timeout'
|
|---|
| 114 | is ignored in that case).
|
|---|
| 115 | """
|
|---|
| 116 | self.not_full.acquire()
|
|---|
| 117 | try:
|
|---|
| 118 | if not block:
|
|---|
| 119 | if self._full():
|
|---|
| 120 | raise Full
|
|---|
| 121 | elif timeout is None:
|
|---|
| 122 | while self._full():
|
|---|
| 123 | self.not_full.wait()
|
|---|
| 124 | else:
|
|---|
| 125 | if timeout < 0:
|
|---|
| 126 | raise ValueError("'timeout' must be a positive number")
|
|---|
| 127 | endtime = _time() + timeout
|
|---|
| 128 | while self._full():
|
|---|
| 129 | remaining = endtime - _time()
|
|---|
| 130 | if remaining <= 0.0:
|
|---|
| 131 | raise Full
|
|---|
| 132 | self.not_full.wait(remaining)
|
|---|
| 133 | self._put(item)
|
|---|
| 134 | self.unfinished_tasks += 1
|
|---|
| 135 | self.not_empty.notify()
|
|---|
| 136 | finally:
|
|---|
| 137 | self.not_full.release()
|
|---|
| 138 |
|
|---|
| 139 | def put_nowait(self, item):
|
|---|
| 140 | """Put an item into the queue without blocking.
|
|---|
| 141 |
|
|---|
| 142 | Only enqueue the item if a free slot is immediately available.
|
|---|
| 143 | Otherwise raise the Full exception.
|
|---|
| 144 | """
|
|---|
| 145 | return self.put(item, False)
|
|---|
| 146 |
|
|---|
| 147 | def get(self, block=True, timeout=None):
|
|---|
| 148 | """Remove and return an item from the queue.
|
|---|
| 149 |
|
|---|
| 150 | If optional args 'block' is true and 'timeout' is None (the default),
|
|---|
| 151 | block if necessary until an item is available. If 'timeout' is
|
|---|
| 152 | a positive number, it blocks at most 'timeout' seconds and raises
|
|---|
| 153 | the Empty exception if no item was available within that time.
|
|---|
| 154 | Otherwise ('block' is false), return an item if one is immediately
|
|---|
| 155 | available, else raise the Empty exception ('timeout' is ignored
|
|---|
| 156 | in that case).
|
|---|
| 157 | """
|
|---|
| 158 | self.not_empty.acquire()
|
|---|
| 159 | try:
|
|---|
| 160 | if not block:
|
|---|
| 161 | if self._empty():
|
|---|
| 162 | raise Empty
|
|---|
| 163 | elif timeout is None:
|
|---|
| 164 | while self._empty():
|
|---|
| 165 | self.not_empty.wait()
|
|---|
| 166 | else:
|
|---|
| 167 | if timeout < 0:
|
|---|
| 168 | raise ValueError("'timeout' must be a positive number")
|
|---|
| 169 | endtime = _time() + timeout
|
|---|
| 170 | while self._empty():
|
|---|
| 171 | remaining = endtime - _time()
|
|---|
| 172 | if remaining <= 0.0:
|
|---|
| 173 | raise Empty
|
|---|
| 174 | self.not_empty.wait(remaining)
|
|---|
| 175 | item = self._get()
|
|---|
| 176 | self.not_full.notify()
|
|---|
| 177 | return item
|
|---|
| 178 | finally:
|
|---|
| 179 | self.not_empty.release()
|
|---|
| 180 |
|
|---|
| 181 | def get_nowait(self):
|
|---|
| 182 | """Remove and return an item from the queue without blocking.
|
|---|
| 183 |
|
|---|
| 184 | Only get an item if one is immediately available. Otherwise
|
|---|
| 185 | raise the Empty exception.
|
|---|
| 186 | """
|
|---|
| 187 | return self.get(False)
|
|---|
| 188 |
|
|---|
| 189 | # Override these methods to implement other queue organizations
|
|---|
| 190 | # (e.g. stack or priority queue).
|
|---|
| 191 | # These will only be called with appropriate locks held
|
|---|
| 192 |
|
|---|
| 193 | # Initialize the queue representation
|
|---|
| 194 | def _init(self, maxsize):
|
|---|
| 195 | self.maxsize = maxsize
|
|---|
| 196 | self.queue = deque()
|
|---|
| 197 |
|
|---|
| 198 | def _qsize(self):
|
|---|
| 199 | return len(self.queue)
|
|---|
| 200 |
|
|---|
| 201 | # Check whether the queue is empty
|
|---|
| 202 | def _empty(self):
|
|---|
| 203 | return not self.queue
|
|---|
| 204 |
|
|---|
| 205 | # Check whether the queue is full
|
|---|
| 206 | def _full(self):
|
|---|
| 207 | return self.maxsize > 0 and len(self.queue) == self.maxsize
|
|---|
| 208 |
|
|---|
| 209 | # Put a new item in the queue
|
|---|
| 210 | def _put(self, item):
|
|---|
| 211 | self.queue.append(item)
|
|---|
| 212 |
|
|---|
| 213 | # Get an item from the queue
|
|---|
| 214 | def _get(self):
|
|---|
| 215 | return self.queue.popleft()
|
|---|