Illustrations by Max Dmytriv

♚ \

Weapon, leisure to laugh at floating life hang a pen involved in the end, Zhu Lin can and the words of the two.

Blog: zhihu.com/people/hong-wei-peng

start

The Queue module provides a first-in, first-out (FIFO) data structure for multithreaded programming. Because it is thread-safe, multiple threads can easily use the same instance.

Source code analysis

Let’s start with the initialized function:

class Queue: def __init__(self, maxsize=0): Self. maxsize = maxsize self._init(maxsize) Self. Not_empty = threading.condition (self. Mutex) self.not_full = self threading.Condition(self.mutex) self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0 def _init(self, maxsize): # initialize the underlying data structure self.queue = deque()Copy the code

What information can we get from this initialization function? First, the size of the queue can be set, and the underlying data structure for the specific elements uses the collections.deque() double-endialled list, which makes it easy to do fifO. The _init function is abstracted specifically to make it easier for subclasses to override, allowing subclasses to use other structures to store elements (for example, priority queues use lists).

Then there is the thread lock self.mutex, which is acquired for all operations on the underlying data structure self.queue; Further down are the three Condition variables, all of which take self.mutex as an argument, meaning they share the same lock. From this we know that things like with self.mutex and with self.not_empty are mutually exclusive.

Some simple operations based on these locks:

class Queue: ... Def qsize(self): return self._qsize() def empty(self): Return not self._qsize() def full(self): return 0 < self.maxsize <= self._qsize() def _qsize(self): return len(self.queue)Copy the code

This code snippet is fairly understandable and requires no analysis.

As a queue, it is mainly necessary to complete the operation of joining and leaving the queue, the first is joining the queue:

class Queue: ... Def put(self, item, block=True, timeout=None): with self.not_full: If self._qsize() >= self.maxsize: raise Full # If the block is False and the queue is Full, raise Full elif timeout is None: While self._qsize() >= self.maxsize: self.not_full.wait() # block until free space elif timeout < 0: ValueError raise ValueError("'timeout' must be a non-negative number") else: While self._qsize() >= self.maxsize: Remaining = endtime-time () if remaining <= 0.0: Raise Full # No space is available during the wait. Wait (remaining) self._put(item) # Add an element to the underlying data structure self.unfinished_tasks += 1 self.not_empty.notify() def _put(self, item): self.queue.append(item)Copy the code

Even with only two dozen lines of code, the logic here is complicated. It handles timeouts and insufficient queue free space as follows:

1. If block is False, ignore the timeout argument

  • If the queue is Full, the Full exception is thrown.
  • If the queue is not full, the element is immediately saved to the underlying data structure.

2. If block is True

  • If timeout is None, the PUT operation may block until there is free space in the queue (default).
  • If timeout is non-negative, it blocks until there is free space in the queue. During this time, if there is no space in the queue, the Full exception is thrown.

After processing the argument logic, save the element to the underlying data structure and increment unfinished_tasks, notifying NOT_EMPTY to wake up the thread waiting for the data.

Queue operation:

class Queue: ... def get(self, block=True, timeout=None): with self.not_empty: if not block: if not self._qsize(): raise Empty elif timeout is None: while not self._qsize(): self.not_empty.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while not self._qsize(): Remaining = endtime-time () if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) item = self._get() self.not_full.notify() return item def _get(self): return self.queue.popleft()Copy the code

The get() operation is the opposite of put(), and the code block is similar in that get() removes the first inserted element from the queue and returns it.

1. If block is False, ignore the timeout argument

  • If there are no elements in the queue, the Empty exception is thrown.
  • If the queue is composed of elements, the elements are immediately saved to the underlying data structure.

2. If block is True

  • If timeout is None, get may block until there are elements in the queue (default).
  • If timeout is non-negative, it blocks until there are elements in the queue, during which time an Empty exception is thrown if there are no elements in the queue.

Finally, the first queued element is removed by self.queue.popleft() and not_full is notified to wake up the thread waiting for data in it.

One thing to note here is that self.unfinished_tasks are incremented in put(), but not in get(). Why?

Get () simply gets the element and does not represent the element being processed by the consumer thread. The user needs to call task_done() to notify the queue that the task is completed:

class Queue: ... def task_done(self): with self.all_tasks_done: unfinished = self.unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0: If the number of successful calls to put() is less than the number of successful calls to task_done(), Raise ValueError('task_done() called too many times') self.all_tasks_done. Notify_all () # When Unfinished is 0, All_tasks_done self.unfinished_tasks = Unfinished def join(self): with self.all_tasks_done: While self.unfinished_tasks: # Call wait() to wait for self.all_tasks_done.wait() if there is an unfinished task.Copy the code

Because task_done() is called by the task_done() method, an exception will be thrown if the task_done() count is greater than the put() count.

The task_done() operation wakes up the blocking join() operation. The join() method blocks until all the elements in the queue have been fetched and processed (similar to a thread’s join method). That is, the join() method must be used in conjunction with task_done().

LIFO last in first out queue

LifoQueue uses last-in, first-out order, similar to the stack structure:

class LifoQueue(Queue):
    '''Variant of Queue that retrieves most recently added entries first.'''

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self):
        return len(self.queue)

    def _put(self, item):
        self.queue.append(item)

    def _get(self):
        return self.queue.pop()
Copy the code

That’s all there is to LifoQueue, which is one of the reasons Queue is so well designed, abstracting the underlying data operations into four operation functions that handle thread-safety in their own right, allowing subclasses to focus only on the underlying operations.

LifoQueue’s underlying data structure is stored in a list, and the last element in the list can be removed by self.queue.pop() without resetting the index.

PriorityQueue indicates the PriorityQueue

from heapq import heappush, heappop

class PriorityQueue(Queue):
    '''Variant of Queue that retrieves open entries in priority order (lowest first).

    Entries are typically tuples of the form:  (priority number, data).
    '''

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self):
        return len(self.queue)

    def _put(self, item):
        heappush(self.queue, item)

    def _get(self):
        return heappop(self.queue)
Copy the code

Priority queues use the heAPQ module structure, that is, the minimal heap structure. Priority queues are more commonly used, and the order in which items are processed is based on the characteristics of those items. A simple example:

import queue

class A:
    def __init__(self, priority, value):
        self.priority = priority
        self.value = value

    def __lt__(self, other):
        return self.priority < other.priority

q = queue.PriorityQueue()

q.put(A(1, 'a'))
q.put(A(0, 'b'))
q.put(A(1, 'c'))

print(q.get().value)  # 'b'
Copy the code

When using priority queues, you need to define an __lt__ magic method that defines how they compare sizes. If the priority of the elements is the same, the fifO order is still used.

reference

https://pymotw.com/3/queue/index.html
Copy the code

\

Hot recommended in Python create WeChat robot in Python robot to monitor WeChat group chat in Python for cameras and real-time control face open source project | beautification LeetCode warehouse is recommended in Python Python Chinese community’s several kind of public service, announced notice | Python Chinese Community award for articles \

Click to become a registered member of the community ** “Watching” **