Java JVM multithreading MySQL Redis Kafka Docker RocketMQ Nginx MQ queue data structure concurrent programming concurrent pressure kill architecture and other technology knowledge PDF, if you need to click here to receive
What is a blocking queue
Blocking queues have two characteristics:
- Fetching elements from the queue is blocked when there are no elements in the queue
- Adding elements will block when the queue is full
Blocking queues are often used in producer and consumer scenarios, where the producer adds elements to the queue and the consumer takes elements from the queue.
The Queue interface core method
Blocking queues are essentially queues, that is, they inherit the functions of queues. Let’s first look at some of the core methods in the Queue interface:
methods | function |
---|---|
add(e) | Add an element, return true on success, and throw an exception if space is full |
offer(e) | Add an element that returns true on success or false if space is full, better than add for bounded queues |
remove() | Retrieves and removes the queue header element, returning the removed element on success or throwing an exception if the queue is empty |
poll() | Retrieves and removes the queue header element, returning the removed element on success or NULL if the queue is empty |
element() | Retrieves and returns the queue header element, throwing an exception if the queue is empty |
peek() | Retrieves and returns the queue header element, or NULL if the bit column is empty |
These methods are provided by the queue interface. However, these methods do not block, so we need to redefine the interface that blocks the queue. Let’s look at the core methods in the blocking queue.
Block queue BlockigQueue core method of the interface
methods | function |
---|---|
put(e) | Add an element, return true on success, block and wait for ** if space is full |
offer(e,time,unit) | Add an element, return true on success, block if space is full for a specified time, and return null if space is empty at a specified time |
take() | Retrieves and removes the queue header element, returns the removed element on success, and blocks if the queue is empty |
poll(time,unit) | Retrieves and removes the queue header element, returns the removed element on success, blocks if the queue is empty for a specified time, and returns NULL if the queue is empty after a specified time |
drainTo(Collection) | Retrieves all elements of the queue at once into the specified collection and returns the number of transitions |
drainTo(c,n) | Retrieves the specified number of elements in the queue once and places them in the specified collection, and returns the number of transitions |
remainingCapacity() | Returns the number of ideally added elements to the queue |
In Java, seven commonly used blocking queues are provided.
- ArrayBlockingQueue: a bounded blocking queue composed of array structures
- LinkedBlockingQueue: a bounded blocking queue consisting of a linked list structure
- PriorityBlockingQueue: An unbounded blocking queue that supports priority sorting
- DelayQueue: An unbounded blocking queue implemented using priority queues
- SynchronousQueue: A blocking queue that does not store elements
- LinkedTransferQueue: An unbounded blocking queue consisting of a linked list structure
- LinkedBlockingDeque: A two-way blocking queue consisting of a linked list structure
ArrayBlockingQueue
ArrayBlockingQueue is a bounded blocking queue implemented with arrays. This queue sorts elements on a first-in, first-out (FIFO) basis. An unfair lock is implemented by default, and the constructor can pass parameters to control whether the fair or unfair lock is implemented. Let’s start with the ArrayBlockingQueue class diagram:
As you can see, there are three constructors, all of which will eventually be initialized by calling the second constructor in the figure above, and the third constructor will be assigned after initialization (if the Collection passed is not empty).
ArrayBlockingQueue nonFairQueue = new ArrayBlockingQueue(10); ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(10,true); //true: fair lockCopy the code
The simulation implements the producer consumer
package com.zwx.concurrent.queue.block; import java.util.concurrent.ArrayBlockingQueue; public class ArrayBlockingQueueDemo { public static void main(String[] args) throws InterruptedException { ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(100); New Thread(new ConsumerThread(queue)).start(); Thread.sleep(2000); new Thread(new ProcuctThread(queue)).start(); } } class ProcuctThread extends Thread{ private ArrayBlockingQueue queue; public ProcuctThread(ArrayBlockingQueue queue) { this.queue = queue; } @Override public void run() { for (int i=0; i<100; i++){ try { queue.put(i); } catch (InterruptedException e) { e.printStackTrace(); } } } } class ConsumerThread extends Thread{ private ArrayBlockingQueue queue; public ConsumerThread(ArrayBlockingQueue queue) { this.queue = queue; } @Override public void run() { while (true){ try { int a = (int)queue.take(); System.out.println(" consume: "+ a); } catch (InterruptedException e) { e.printStackTrace(); }}}}Copy the code
In the example above, we started consumer mode, and when we ran it, we found that because the queue was empty, it blocked waiting for output after the producer added elements.
Initializing the queue
The first step is to initialize two Condition queues to block producer and consumer threads, respectively
Add element (producer)
When adding elements, if the queue is full, it blocks, and if it is not, it calls the enque method to add elements
Elements are added one by one through the internally maintained putIndex when they are retrieved. When they are full, they start from 0 again
Acquisition elements (consumers)
It first checks whether the queue is empty, blocks if it is empty, or calls the dequeue method to retrieve the element
Call the dequeue method to remove the element and wake up the thread adding the element.
LinkedBlockingQueue
A bounded blocking queue consisting of a linked list structure that sorts elements on a first-in, first-out (FIFO) basis. The difference with ArrayBlockingQueue is that it maintains an array internally. The queue is maintained by array subscripts. LinkedBlockingQueue maintains a linked list and maintains queues through Nodes. Let’s start with the class diagram:
LinkedBlockingQueue still has three constructors, and the first and third constructors will eventually call the second constructor, which by default initializes an Integer.max_value size queue, The third constructor will assign after initializing the queue (if the Collection passed is not empty).
Initializing the queue
Node is a static inner class in LinkedBlockingQueue:
So item in Node defaults to null after the first initialization. After initialization, the queue looks like this:
The head node is also a sentinel, andAQS Synchronization queueSimilarly, an empty message node is set as a sentinel.
Add element (producer)
The added element obtains putLock. As can be seen later, takeLock that obtains the element adopts read/write double-lock separation to achieve performance improvement.
Take a look at the enqueue method for adding elements:
After adding elements, the queue looks like this:
Acquisition elements (consumers)
Here the consumer acquires another lock, takeLock, and the logic should be easier to understand. Let’s go to the dequeue method that actually acquires the element:
1. Let’s look at line 219 and get the following queue:
2. Continue to look at 221 lines of code and get the following result:
After these two steps, the original E1 node has been removed. As you can see from the figure above, next still has a reference to the current head node in the old head node, which cannot be reclaimed according to the GC reachabability algorithm. Therefore, it is necessary to remove the reference to Next (i.e. 218 lines above). In this way, Node does not hold any references to other objects and GC can collect them as garbage. This is the same as in the AQS synchronization queue and Condition queue, in order to remove references and facilitate GC.
LinkedBlockingDeque
LinkedBlockingDeque, like LinkedBlockingQeque, is made up of linked lists, that is, a Node internal class is used to implement the chat. LinkedBlockingDeque is a two-way blocking queue. Therefore, a Node must have one more prev pointing to the previous Node than a one-way one.
Compared with LinkedBlockingQeque, there are addFirst, addLast, offerFirst, offerLast, peekFirst, peekLast and other methods in the bidirectional queue because there is an entry to the operation queue. In addition, the insert method add is equivalent to addLast, the remove method remove is equivalent to removeFirst, and the take method is equivalent to takeFirst. These facts need to be noted, in order to avoid confusion, it is recommended to use the First and Last keywords.
First, let’s look at the class diagram:
As you can see, there is an additional Deque interface compared to the unidirectional queue, and the constructors, like unidirectional lists, also provide three.
Initializing the queue
As you can see, no nodes are set during initialization, only a capacity is set.
Add element (producer)
From the First to add
LinkFirst (Node) = linkFirst(Node)
From the Last add
LinkLast (Node) = linkLast(Node);
Acquisition elements (consumers)
From the First to get
Continue with the unlinkFirst method:
The logic is similar to the one-way queue above, but with a prev pointing
From the Last
Continue with the unlinkLast() method:
conclusion
This article focuses on three of the seven queues provided by Java, which are similar in implementation and easy to understand source code.