preface
What is a queue
Queue is a kind of special linear table, FIFO for short. It is special in that it is only allowed to insert at one end and delete at the other
The end that inserts is called the end of the queue, and the end that deletes is called the head of the queue. A queue with no elements is called an empty queue
Queues are used a lot in programming, including some middleware. The underlying data structure is queues.
What is a blocking queue
Queues are queues. What’s a blocking queue
A blocking queue is a queue in which two additional operations are added, respectively:
- Blocking insertion method: When the queue is full, the insertion thread will be blocked until the queue has more capacity
- Blocking removal methods are supported: When there are no elements in the queue, the thread removing the element is blocked until there are elements to remove from the queue
This article uses LinkedBlockingQueue as an example to explain the differences between queues. LinkedBlockingQueue is named LBQ for your convenience
Because it is the source code analysis article, recommend friends to watch on the PC. Of course, if the screen is big enough never mind
Blocking queue inheritance relationship
Blocking queue is an abstract name, blocking queue underlying data structure can be an array, can be a one-way list, or a two-way list…
LBQ is a queue composed of one-way linked lists. The following figure shows the inheritance relationship between LBQ and LBQ
You can see from the figure that LBQ implements the BlockingQueue interface, and BlockingQueue implements the Queue interface
Queue Interface Analysis
In a top-down fashion, we first analyze what methods are defined in a wave of Queue interfaces
// If the queue capacity permits, the element is immediately inserted into the queue and returns on success
// 🌟 If the queue capacity is full, an exception is thrown
boolean add(E e);
// If the queue capacity permits, the element is immediately inserted into the queue and returns on success
// 🌟 Return false if the queue capacity is full
// When using bounded queues, offer is better than add
boolean offer(E e);
// Retrieves and deletes the queue head node, returning the deleted queue head node
// 🌟 Throws an exception if the queue is empty
E remove(a);
// Retrieves and deletes the queue head node, returning the deleted queue head node
// 🌟 returns null if the queue is empty
E poll(a);
// Check but do not delete the queue head node
// 🌟 Throws an exception if the queue is empty
E element(a);
// Check but do not delete the queue head node
// 🌟 returns null if the queue is empty
E peek(a);
Copy the code
To summarize the methods of the Queue interface, there are three main categories:
- Add elements to the queue container: add, offer
- Remove elements from the queue container: remove, poll
- Check whether the queue head node is empty: element, peek
In terms of program robustness of the interface API, it can be divided into two categories:
- Robust apis: Offer, poll, peek
- Non-robust apis: Add, remove, element
The interface API is not robust by any means. By the robust boundary, we mean that if we use a non-robust API, the chances of errors are higher, so we should focus more on how to catch possible exceptions and handle them
BlockingQueue interface analysis
The BlockingQueue interface inherits from the Queue interface, so some semantically identical APIS are left unread
// Insert the specified element into the queue. If the queue is full, wait until space is available; Throws exception, which can be interrupted while waiting
// 🌟 is an entirely new approach to the Queue interface
void put(E e) throws InterruptedException;
// Inserts the specified element into the queue. If the queue is full, wait for the specified amount of time to vacate space. Throws exception, which can be interrupted while waiting
// 🌟 is an extension of offer(E E)
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
// Retrieve and remove the head node of this queue, if necessary, waiting until the element is available; Throws exception, which can be interrupted while waiting
E take(a) throws InterruptedException;
// Retrieves and deletes the head of this queue, waiting for the specified wait time if necessary to make the element available; Throws exception, which can be interrupted while waiting
// 🌟 is an extension of poll()
E poll(long timeout, TimeUnit unit) throws InterruptedException;
// Return the remaining capacity of the queue, or integer.max_value if the queue is unbounded
int remainingCapacity(a);
Return true if this queue contains the specified element
public boolean contains(Object o);
// Remove all available elements from this queue and add them to the given collection
int drainTo(Collection<? super E> c);
// Remove up to a given number of available elements from this queue and add them to the given collection
int drainTo(Collection<? super E> c, int maxElements);
Copy the code
You can see that there are quite a few personalized methods in the BlockingQueue interface. The pig’s foot LBQ in this article is implemented from the BlockingQueue interface
The source code parsing
Variable analysis
LBQ uses ReentrantLock and Condition control under JUC package to ensure concurrent add and remove operations
// take, poll, etc
private final ReentrantLock takeLock = new ReentrantLock();
// The deletion thread is suspended when there is no data in the queue
private final Condition notEmpty = takeLock.newCondition();
// Locks required by new operations, such as put and offer
private final ReentrantLock putLock = new ReentrantLock();
// Add element thread is suspended when queue is empty
private final Condition notFull = putLock.newCondition();
Copy the code
ArrayBlockingQueue (ABQ) ArrayBlockingQueue (ABQ) Don’t worry about concurrency
- Because ABQ uses a lock internally to control enqueueing and enqueueing, only a single thread can modify the count variable at any one time
- LBQ uses two locks, so there will be two threads changing the count value at the same time. If you use int like ABQ, the two processes will change the count number at the same time, which will cause inaccurate data, so you need to use concurrent atomic class modification
If you’re not sure why you’re using atomic class statistics, poke here
And then you start with the structure, and you know what elements it’s made of, and what each element does. If you have a good data structure, you should be able to guess it
// The binding capacity, if unbounded, is integer.max_value
private final int capacity;
// The number of elements in the current queue
private final AtomicInteger count = new AtomicInteger();
// The head node of the current queue
transient Node<E> head;
// The last node of the current queue
private transient Node<E> last;
Copy the code
With the head and last elements, we have a rough prototype for LBQ, but we still need a Node structure
static class Node<E> {
// The element stored by the node
E item;
// The successor of the current nodeLinkedBlockingQueue.Node<E> next; Node(E x) { item = x; }}Copy the code
Constructor analysis
So let’s draw a picture here to understand how the LBQ default constructor initializes the queue. Okay
public LinkedBlockingQueue(a) {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
Copy the code
As you can see, the default constructor sets the capacity to integer.max_value, commonly known as an unbounded queue
It calls the overloaded parameter constructor, sets the capacity, initializes the Node where item is empty, and associates head and last
When the head last Node points to an empty Node item and next, what happens to the queue when a record is added
Node team
The element that needs to be added is wrapped as Node and added to the queue. If the queue element is full, the insertion thread is blocked until an empty position in the queue is woken up
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e); // Encapsulate the data to be added as Node
final ReentrantLock putLock = this.putLock; // Get the lock for the add operation
final AtomicInteger count = this.count; // Get the actual number of elements in the queue
putLock.lockInterruptibly(); // Run the interruptible locking API
try {
while (count.get() == capacity) { // If the number of queue elements == the maximum number of queue elements, the thread is placed in the conditional queue block
notFull.await();
}
enqueue(node); // Execute the joining process
c = count.getAndIncrement(); // Get the value and increment it, for example, count = 0
if (c + 1 < capacity) If the increment of queue element +1 is less than the maximum number of queue containers, wake up a thread blocked in the insert wait queue
notFull.signal();
} finally {
putLock.unlock(); // Unlock operation
}
if (c == 0) // When there is a piece of data in the queue, it wakes up the consuming group thread to consume it
signalNotEmpty();
}
Copy the code
The overall process of joining the team is relatively clear, and the following things are done:
- If the queue is full, the current thread is blocked
- If there is a vacancy in the queue, the Node encapsulates the data to queue
- If Node enlists and there is still space in the queue, it wakes up the adding thread in the queue
- If there are no elements in the queue before the data is enqueued, the thread in the consuming blocking queue is woken up after the data is enqueued
What does the queue method LBQ#enqueue do
private void enqueue(Node<E> node) {
last = last.next = node;
}
Copy the code
The code is relatively simple. Assign node to the next attribute of the current last node, and then point the last node to node to complete the enqueue operation
If the LBQ template is String, insert element A first, and the queue looks like this:
What? One piece of data is not enough? There is nothing that can’t be solved by another line, element B joins as follows:
Queue enqueueing is shown in the figure above. Item in head is always empty, and next in last is always empty
LBQ#offer is also the queue method, except that if the queue element is full, it returns false and does not block the thread
The node out of the team
LBQ#take queue method, if the queue element is empty, blocks the current queue thread until there are elements in the queue
public E take(a) throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count; // Get the actual number of elements in the current queue
final ReentrantLock takeLock = this.takeLtakeLocock; // Get the takeLock lock instance
takeLock.lockInterruptibly(); // Obtain the takeLock lock. If the lock is not blocked, it can be interrupted
try {
while (count.get() == 0) { // If the current queue element == 0, the current node thread joins the wait queue
notEmpty.await();
}
x = dequeue(); // If the current queue element is > 0, the head node is dequeued
c = count.getAndDecrement(); // Get the number of current queue elements and set the number to -1
if (c > 1) // When there are still elements in the queue, wake up the next consuming thread to consume
notEmpty.signal();
} finally {
takeLock.unlock(); / / releases the lock
}
if (c == capacity) // The queue is full before removing the element, and wakes up the producer thread to add the element
signalNotFull();
return x; // return the head node
}
Copy the code
The overall process of team exit operation is clear and clear, and the implementation process of team entry operation is similar
- If the queue is full, the current queue thread is blocked
- If there are any consumable elements in the queue, the node is dequeued
- If there are still dequeued elements in the queue after the node dequeued, the dequeued thread in the waiting queue will be woken up
- If the queue is full before removing the element, wake up the producer thread to add the element
LBQ#dequeue the dequeue operation is a bit more complicated than the queue operation
private E dequeue(a) {
Node<E> h = head; // Get the queue head node
Node<E> first = h.next; // Get the successor of the head node
h.next = h; // help GC
head = first; // The successor of the head node is set to the new head node
E x = first.item; // Get the new header item
first.item = null; // Because the header item is empty, item is assigned null
return x;
}
Copy the code
In the enqueue process, the original head node points itself to help the GC reclaim the current node, and then the next node of the original head is set as the new head. The complete enqueue process is shown below
As shown above, there is no special point in the process. Another LBQ#poll method, which returns null if the element in the queue is empty, does not block like a take
Nodes in the query
Since the element lookup method is implemented in the parent class AbstractQueue, LBQ only implements peek, and node queries are represented by peek
Peek and Element both fetch queue head node data. The difference between peek and Element is that the former returns NULL if the queue is empty, while the latter throws an exception
public E peek(a) {
if (count.get() == 0) // Return null if the queue is empty
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock(); / / acquiring a lock
try {
LinkedBlockingQueue.Node<E> first = head.next; // Get the next successor of the head node
if (first == null) // If the successor node is empty, return null, otherwise return the successor node item
return null;
else
return first.item;
} finally {
takeLock.unlock(); / / unlock}}Copy the code
Peek gets the head. Next item, although the head item is always null
The node to delete
Deleting requires acquiring two locks, so operations on acquiring nodes, dequeuing nodes, enqueuing nodes, and so on are blocked
public boolean remove(Object o) {
if (o == null) return false;
fullyLock(); // Get two locks
try {
// Loop through the queue, starting at the beginning node
for(Node<E> trail = head, p = trail.next; p ! =null;
trail = p, p = p.next) {
if (o.equals(p.item)) { // item == o Deletes the item
unlink(p, trail); // Delete operation
return true; }}return false;
} finally {
fullyUnlock(); // Release two locks}}Copy the code
In general, the deletion operation of the linked list is cyclic traversal, and the complexity of such traversal is O(n). In the worst case, all nodes of the linked list are traversed
How does unlink remove a node from LBQ#remove
void unlink(Node<E> p, Node<E> trail) {
p.item = null; Trail is the first node and p is the successor node of the first node
trail.next = p.next; // Set the successor pointer of the head node to the successor pointer of the p node
if (last == p) // Set last == trail if p == last
last = trail;
// If the queue was full before the element was deleted, the production thread will be woken up after the deletion
if (count.getAndDecrement() == capacity)
notFull.signal();
}
Copy the code
The remove method is similar to the take method. If the remove element is a head node, the effect is the same as that of take
For better understanding, let’s remove the middle element. Draw two pictures to understand what’s going on. The code is as follows:
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue();
blockingQueue.offer("a");
blockingQueue.offer("b");
blockingQueue.offer("c");
// Remove the middle element of the queue
blockingQueue.remove("b");
}
Copy the code
After executing the three OFFER operations in the above code, the queue structure diagram is as follows:
The queue structure after deleting element B is shown below:
If the p node is the last tail node, set the precursor node of P to the new tail node. The delete operation looks like this
Application scenarios
As mentioned above, blocking queues are used in many business scenarios. Here are two practical examples to help you understand
Producer-consumer model
Producer-consumer mode is a typical multithreaded concurrent writing mode. A container is needed between the producer and the consumer to solve the strong coupling relationship. The producer puts data into the container and the consumer consumes the container data
Producer-consumer implementations come in many ways
- Wait, notify, and notifyAll of the Object classes
- Await, signal, signalAll of Condition in Lock
- BlockingQueue
Blocking queues implement the producer-consumer model as follows:
@Slf4j
public class BlockingQueueTest {
private static final int MAX_NUM = 10;
private static final BlockingQueue<String> QUEUE = new LinkedBlockingQueue<>(MAX_NUM);
public void produce(String str) {
try {
QUEUE.put(str);
log.info("🔥🔥🔥 put elements in the queue :: {}, number of queue elements :: {}", str, QUEUE.size());
} catch (InterruptedException ie) {
// ignore}}public String consume(a) {
String str = null;
try {
str = QUEUE.take();
log.info("🔥🔥🔥 queue removed elements :: {}, number of queue elements :: {}", str, QUEUE.size());
} catch (InterruptedException ie) {
// ignore
}
return str;
}
public static void main(String[] args) {
BlockingQueueTest queueTest = new BlockingQueueTest();
for (int i = 0; i < 5; i++) {
int finalI = i;
new Thread(() -> {
String str = "Elements -";
while (true) {
queueTest.produce(str + finalI);
}
}).start();
}
for (int i = 0; i < 5; i++) {
new Thread(() -> {
while (true) { queueTest.consume(); } }).start(); }}}Copy the code
Thread pool application
The specific application of blocking queues in thread pools is a real producer-consumer scenario
The importance of thread pool in Java application is self-evident, here is a brief description of the operation principle of thread pool
- The number of threads in the thread pool is smaller than the number of core threads
- When the number of threads in the thread pool is greater than or equal to the number of core threads, the task is placed in the blocking queue
- A thread pool creates a non-core thread if the number of threads in the pool is greater than or equal to the number of core threads and the blocking queue is full
The second important point is that when the thread pool core threads are all running tasks, they put the tasks in a blocking queue. Thread pool source code as follows:
if (isRunning(c) && workQueue.offer(command)) {}
Copy the code
Seeing the offer method used, as described above, returns false if the blocking queue is full. So when do we consume the elements in the queue. The principle of thread execution in thread pool is briefly described here
- There are two ways that threads in the thread pool can perform tasks, either by creating tasks that come with the core thread, or by fetching tasks from the blocking queue
- When a core thread executes a task, it’s no different than a non-core thread, right
The thread pool uses two apis to get blocking queue tasks, poll and Take
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
Copy the code
Q: Why use two apis? Doesn’t one smell good?
A: Take is an important means to maintain the core thread in the thread pool. If the task is not acquired, the thread is suspended, waiting for the next task to be added
Pools with time are prepared to recycle non-core threads
“Said
The LBQ blocking queue is explained here, summarize the basic characteristics of LBQ described in the article
- LBQ is a blocking queue based on linked list implementation that can be read and written concurrently
- The LBQ queue size can be set by itself, or it can be called unbounded if the default Integer maximum is not set
The article combined with the source code, for LBQ team, team, query, delete and other operations are explained in detail
LBQ is just an introduction, but I hope you can grasp the core idea of blocking queues through this article and then look at the code of other implementation classes to consolidate your knowledge
LBQ realizes concurrency security control through locking mechanism. Can it be realized without locking and how can it be realized? See you next time!
Wechat search [source interest circle], pay attention to the public number after reply 123 receive content covers GO, Netty, Seata, SpringCloud Alibaba, development specifications, interview treasure book, data structure and other learning materials!