BlockingQueue –BlockingQueue
Reference:
- Blog.csdn.net/qq_39837804…
- Blog.csdn.net/qq_41191715…
- Blog.csdn.net/yemuxiaweil…
1. Queue
Queue is one of the more important types of data structure. It supports FIFO, tail addition, and head deletion (elements of advanced Queue are Queue).
Queue is at the same level as List and Set and inherits the Collection interface.
A Queue lets people effectively add an element to the tail and remove an element in the head. A queue with two ends, known as a double-ended queue, allows people to effectively add or remove elements at both the head and tail. Adding elements in the middle of a queue is not supported. The Deque interface was introduced in Java SE 6 and is implemented by the ArrayDeque and LinkedList classes. Both classes provide double-endian queues and can increase their length if necessary.
AbstractQueue (AbstractQueue)
2.1 Priority Queue
Elements can be inserted in any order, but are always retrieved in the sorted order. That is, whenever the remove method is called, the smallest element in the current priority queue is always obtained. However, the priority queue does not sort all the elements. If you process these elements iteratively, you don’t need to sort them. Priority queues use an elegant and efficient data structure called the heap.
The heap is a self-adjusting binary tree that can be added and remore to move the smallest elements to the root without having to spend time sorting the elements. Like TreeSet, a priority queue can hold either class objects that implement the Comparable interface or Comparator objects provided in the constructor. A typical example of using priority queues is task scheduling. Each task has a priority and tasks are added to the queue in a random order. Each time a new task is started, the highest priority task is removed from the queue (the smallest element is removed because it is customary to set 1 to “highest” priority).
2.2, ConcurrentLinkedQueue
ConcurrentLinkedQueue is an unbounded thread-safe queue based on linked nodes, which are sorted on a first-in, first-out basis. When we add an element, it is added to the end of the queue, and when we retrieve an element, it returns the element at the head of the queue. It is implemented using the “wait-free” algorithm, which is modified from the Michael & Scott algorithm.
ConcurrentLinkedQueue consists of head nodes and tail nodes. Each Node (Node) consists of a Node element (item) and a reference to the next Node (Next). It is through this next that nodes are associated to form a queue with a linked list structure.
3. BlockingQueue
3.1 Overview of blocking queues
What is a blocking queue?
A blocking queue, as its name implies, is first of all a queue, i.e. first-in, first-out. Through a shared queue, data can be input from one end of the queue and output from the other end. The role of a blocking queue in the data structure is roughly shown in the figure
- When the blocking queue is empty, fetching elements from the queue will be blocked because it waits for the producer thread to produce the element
- When the blocking queue is full, adding elements to the queue will be blocked
- Similarly, a thread that tries to add new elements to a blocking queue that is already full is blocked until another thread removes one or more elements from the queue or emptying the queue completely, making the queue free again and adding new elements
The so-called blocking: that is, under certain circumstances the thread is suspended (that is, blocked), once the condition is met, the suspended thread will automatically wake up again
Benefits of blocking queues
The advantage is that we don’t have to worry about when we need to block or wake up threads, because BlockingQueue takes care of all of this for you. Prior to the concurrent package, in a multi-threaded environment, each of us programmers had to control these details, especially with regard to efficiency and thread safety. And that brings a lot of complexity to our program
Blocking queues are often used in producer and consumer scenarios, where the producer is the thread that adds elements to the queue and the consumer is the thread that takes elements from the queue. A blocking queue is a container in which producers hold elements, and consumers only take elements from the container. When to use: Multi-threaded concurrent processing, thread pooling.
3.2 Types of blocking queues
ArrayBlockingQueue
By:An array ofStructure of a bounded blocking queueLinkedBlockingQueue
By:The listStructure composition is bounded (but size defaults)Integer.MAX_VALUE
) blocks a queue that sorts elements on a first-in, first-out basis.throughputUsually higher than ArrayBlockingQueue.SynchronousQueue
: blocks queues that do not store elements, and each PUT operation must wait for a take operation, otherwise elements cannot be addedSynchronousQueue
No capacity with othersBlcokingQueue
SynchronousQueue is a non-element storage queueBlcokingQueue
- each
put
The operation must wait for onetake
Operation, otherwise you cannot continue adding elements, and vice versa
PriorityBlockingQueue
: an unbounded blocking queue that supports priority sorting.LinkedTransferQueue:
An unbounded blocking queue consisting of a linked list structure.LinkedBlockingDeque
: bidirectional blocking queue composed of linked list structureDelayQueue
: an unbounded blocking queue implemented using a priority queue.
3.2.1, ArrayBlockingQueue
AbstractBlockingQueue ArrayBlockingQueue is a blocking Queue derived from AbstractBlockingQueue that implements Queue and Collection interfaces indirectly. The bottom layer holds the data as an array (actually a circular array). ArrayBlockingQueue is a bounded queue, which means it can’t store an infinite number of objects. So when you create ArrayBlockingQueue, you must specify a queue size for it.
This queue sorts elements on a first-in, first-out (FIFO) basis. By default, visitors are not guaranteed fair access to queues, which are blocked threads that can access the queue in the order in which they are blocked. Unfairness is unfair to the threads that wait first. When the queue is available, the blocking threads can compete for access to the queue.
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000.true);
Copy the code
Visitor fairness is implemented using reentrant locks as follows:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
Copy the code
Here are a few important methods in ArrayBlockingQueue:
Add (E E) : Add E to the BlockingQueue, or return if the BlockingQueue can hold ittrueOffer (E E) : if possible, add E to The BlockingQueue, or return if the BlockingQueue can hold ittrueOtherwise returnfalse
put(E e)Add e to BlockingQueue. If there is no room in the BlockQueue, the thread calling this method is blocked until there is room in the BlockingQueuepoll(time): Removes the first object in BlockingQueue. If it cannot be retrieved immediately, wait for the time parameter to returnnull
take(a)If BlockingQueue is empty, block entry to Blocking until a new block is addedremainingCapacity(a): Remaining available size. Equals the initial capacity minus the current sizeCopy the code
Note:
- ArrayBlockingQueue is a first-in, first-out queue
- ArrayBlockingQueue is a bounded queue (the size specified when the queue is initialized). Empty capacity, block queue operation)
- ArrayBlockingQueue does not support empty elements
- The ArrayBlockingQueue queue operation uses a lock to ensure concurrency security. The source code has a while() judgment:
public void put(E e) throws InterruptedException {
checkNotNull(e); // Non-null judgment
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); / / acquiring a lock
try {
while (count == items.length) {
// Block until the queue is not full
notFull.await();
}
enqueue(e); / / into the team
} finally{ lock.unlock(); }}public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
// block until the queue is not full
// Or the timeout period has expired, return false
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally{ lock.unlock(); }}Copy the code
Several points mentioned in the JDK documentation:
- BlockingQueue does not accept null elements. Some implementations throw a NullPointerException when attempting to add, put, or offer a null element. Null is used as a warning value indicating a poll operation failure.
- BlockingQueue can be volume-limiting. It can have a remainingCapacity at any given time, beyond which additional elements cannot be put without blocking. BlockingQueue without any internal capacity constraints always reports the remaining capacity of INTEger.max_value.
- The BlockingQueue implementation is primarily for producer-consumer queues, but it also supports the Collection interface. So, for example, it is possible to remove any element from the queue using remove(x). However, this operation is usually not performed effectively and can only be used sporadically in a planned way, such as when unqueuing information.
- The BlockingQueue implementation is thread-safe. All queuing methods can achieve their purpose automatically using internal locking or other forms of concurrency control. However, a large number of Collection operations (addAll, containsAll, retainAll, and removeAll, which are used as sparingly as possible) need not be performed automatically unless specified in the implementation. So, for example, addAll(c) might fail (throw an exception) after adding only a few elements in C.
The core method of BlockingQueue
4.1 overview,
Blocking queues provide four handling methods:
Method/processing mode | An exception is thrown | Return special value | Has been blocked | Timeout exit |
---|---|---|---|---|
Insert method | The add () | Offer (e) | Put () | Offer (e, time, unit) |
Remove method | Remove () | Poll () | Take () | Poll (time, unit) |
Check the method | Element () | Peek () | Do not use | Do not use |
- Throw an exception:Thrown when an element is inserted into a blocking queue when the queue is full
LlegalStateException (" Queue full ")
The exception. Thrown when an element is fetched from the queue when the queue is emptyNoSuchElementException
The exception. - Returns a special value: The insert method returns success or true on success. Remove method, which removes an element from the queue or returns NULL if none is present
- Block: When the blocking queue is full, if the producer thread puts elements to the queue, the queue will block the producer thread until it gets data or exits in response to an interrupt. When the queue is empty, the consumer thread tries to take elements from the queue, and the queue blocks the consumer thread until the queue is available.
- Timeout exit: When the blocking queue is full, the queue blocks the producer thread for a certain amount of time, and the producer thread exits after a certain amount of time.
The implementation of a method that throws an exception is the same as that of a method that returns a special value, except that the failed operation is handled differently! AbstractQueue source code: Add (e), remove(), element() are implemented based on offer(), poll(), peek(), respectively
public boolean add(E arg0) {
if (this.offer(arg0)) {
return true;
} else {
throw new IllegalStateException("Queue full"); }}public E remove(a) {
Object arg0 = this.poll();
if(arg0 ! =null) {
return arg0;
} else {
throw newNoSuchElementException(); }}public E element(a) {
Object arg0 = this.peek();
if(arg0 ! =null) {
return arg0;
} else {
throw newNoSuchElementException(); }}Copy the code
4.2 Block queue API throw exception group
public class BlockingQueueDemo {
public static void main(String[] args) {
// List list = new ArrayList();
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
/ / the boundaries of Queue is 3, when using the add method add elements, can throw exceptions, Exception in the thread "is the main" Java. Lang. An IllegalStateException: Queue full
// System.out.println(blockingQueue.add("x"));
/ / check whether there is any element in the queue, when to return to the first element, when there is no element throws an Exception, the Exception in the thread "is the main" Java. Util. NoSuchElementException
System.out.println(blockingQueue.element());
// Remove the first element, return the removed element
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
// Remove the specified element, return true on success, false on failure
System.out.println(blockingQueue.remove("a"));
System.out.println(blockingQueue.remove());
/ / when no element in the queue, call the remove method can throw exceptions, Exception in the thread "is the main" Java. Util. NoSuchElementException
//System.out.println(blockingQueue.remove());}}Copy the code
4.3 Block queue API return special value group
public class BlockingQueueDemo {
public static void main(String[] args) {
// List list = new ArrayList();
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
// Add elements, return true on success; Return false on failure, no exception is thrown
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("x"));
// Check if there are any elements in the queue. If there are elements, return the first element value
System.out.println(blockingQueue.peek());
// When an element is removed, the value of the removed element is successfully returned; Returns NULL on failure and does not throw an exception
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
// Check if there are any elements in the queue. If the queue is empty, return null, no exception is thrownSystem.out.println(blockingQueue.peek()); }}Copy the code
4.4 block queue API blocking and timeout control
Block:
public class BlockingQueueDemo {
public static void main(String[] args) throws Exception {
// List list = new ArrayList();
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
// Cannot store null values
blockingQueue.put("a");
blockingQueue.put("a");
blockingQueue.put("a");
// When the blocking queue is full, the producer thread continues to add put elements to the queue, and the queue blocks the producer thread until the PUT data or response interrupt exits
//blockingQueue.put("a");
System.out.println("= = = = = = = = = = = = = = = = = = = = = = = = > >");
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
// When the blocking queue is empty, the consumer thread tries to take elements from the queue, and the queue blocks the consumer thread until the queue is available
// blockingQueue.take();}}Copy the code
Timeout:
public class BlockingQueueDemo {
public static void main(String[] args) throws Exception {
// List list = new ArrayList();
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a".2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a".2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a".2L, TimeUnit.SECONDS));
// When the blocking queue is full, the queue blocks the producer thread for a certain amount of time, and then the producer thread exits
System.out.println(blockingQueue.offer("a".2L, TimeUnit.SECONDS)); }}Copy the code
Block SynchronnusQueue
SynchronousQueue has no capacity. Unlike other BlockingQueues, SynchronousQueue is a BlockingQueue that does not store elements. Each PUT operation must wait for a take operation, otherwise it cannot continue to add elements, and vice versa.
Code proof:
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue blockingQueue = new SynchronousQueue();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"\tput 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+"\tput 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+"\tput 3");
blockingQueue.put("3");
} catch(InterruptedException e) { e.printStackTrace(); }},"AAA").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName()+"\ttake 1");
blockingQueue.take();
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName()+"\ttake 2");
blockingQueue.take();
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName()+"\ttake 3");
blockingQueue.take();
} catch(InterruptedException e) { e.printStackTrace(); }},"BBB").start(); }}Copy the code
Console print result:
AAA put 1
BBB take 1
AAA put 2
BBB take 2
AAA put 3
BBB take 3
Copy the code
6. Application scenarios
- Producer-consumer model
- The traditional version
- Blocking queue edition
- The thread pool
- Message middleware
6.1. Producer-consumer model
6.1.1 Producer consumer Traditional edition of Thread Communication
Steps:
- Thread operation (method) resource class
- Judgment work notice
- Prevent false wake up mechanism
- The wait() method of class Object may have false wakeup
while
- When there are only two threads, use
if
When judging, there will be no false awakening; However, when the number of threads increases to 4, a false wake up occurs.
Code implementation
/** * title: a variable with an initial value of zero, two threads alternately operate on it, one increment and one subtraction, for 5 rounds */
class ShareData{
private Integer number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment(a){
// 1
lock.lock();
try {
/ / determine
while(number ! =0) {
condition.await();
}
/ / work
number++;
System.out.println(Thread.currentThread().getName()+"\t" + number);
/ / wake
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally{ lock.unlock(); }}public void decrement(a){
// 1
lock.lock();
try {
/ / determine
while (number == 0) {
condition.await();
}
/ / work
number--;
System.out.println(Thread.currentThread().getName()+"\t" + number);
/ / wake
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally{ lock.unlock(); }}}public class ProdConsumer_TraditionDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(()->{
for (int i = 0; i < 5; i++) { shareData.increment(); }},"A").start();
new Thread(()->{
for (int i = 0; i < 5; i++) { shareData.decrement(); }},"B").start();
new Thread(()->{
for (int i = 0; i < 5; i++) { shareData.increment(); }},"C").start();
new Thread(()->{
for (int i = 0; i < 5; i++) { shareData.decrement(); }},"D").start(); }}Copy the code
If changing while to if may result in false wake up, the console output is as follows:
A 1
B 0
A 1
B 0
A 1
B 0
A 1
B 0
A 1
C 2
B 1
D 0
Copy the code
6.1.2 Block queue version of producer consumer mode
- Producer-consumer Problem description:
- Producers and consumers share the same storage space at the same time;
- Producers add products to storage space, consumers take products from storage space;
- The consumer blocks when the storage is empty and the producer blocks when the storage is full.
- Program business Description:
- Producers produce and consumers consume in an orderly manner
- The mode can be started or stopped using variables
- When the producer has insufficient capacity, if the consumer cannot get the item from the queue when consuming the item, he/she will wait for 2s and exit the mode after timeout
- Producers produce too much, consumers consume too much, and producers choke.
- Warehouse memory is 10
Resource class:
package test;
import org.springframework.util.StringUtils;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Resource {
// This tag variable marks whether the production-consumption mode is enabled. True indicates that the production-consumption mode is enabled, and false indicates that the production-consumption mode is disabled
private volatile boolean flag = true;
// Define variables to count production-consumed items to ensure atomicity, so use this type of variable
private AtomicInteger atomicInteger = new AtomicInteger();
// Define a blocking queue in the form of a concrete queue passed in the constructor
BlockingQueue<String> blockingQueue = null;
public Resource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(*** the blocking queue passed in is: + blockingQueue.getClass().getName());
}
// Define the production method
public void myProd(a) throws Exception {
String data = null;
boolean retValue;
while (flag) {
data = atomicInteger.incrementAndGet() + "";
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if (retValue) {
System.out.println(Thread.currentThread().getName() + "\t" + "Insert queue" + data + "Success");
} else {
System.out.println(Thread.currentThread().getName() + "\t" + "Insert queue" + data + "Failure");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + "\t" + "End of production action.");
}
// Define the consumption method
public void myConsume(a) throws Exception {
String result = null;
while (flag) {
result = blockingQueue.poll(2L, TimeUnit.SECONDS);
// The output fails, and the production-consumption mode will exit after the timeout
if (StringUtils.isEmpty(result)) {
flag = false;
System.out.println(Thread.currentThread().getName() + "\t" + "No more than 2s, quit.");
System.out.println();
System.out.println();
return;
}
System.out.println(Thread.currentThread().getName() + "\t" + "Consumption" + result + "Success");
TimeUnit.SECONDS.sleep(2); }}public void stop(a) throws Exception {
this.flag = false; }}Copy the code
The test class
package test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
/** * Emulated producer-consumer mode with blocking queue * producer-consumer problem description: * 1. Producers and consumers share the same storage space at the same time; * 2. Producers add products to storage space, and consumers remove products from storage space; * 3. The consumer blocks when the storage space is empty and the producer blocks when the storage space is full. * Program business description: * 1. Orderly production of producers, orderly consumption of consumers * 2. The mode can be started or stopped by variable control * 3. When the producer has insufficient capacity, if the consumer cannot get the item from the queue when consuming the item, he/she will wait for 2s and exit the mode when the time expires * 4. Producers produce too much, consumers consume too much, and producers choke. * 5. The warehouse memory is 10 */
public class Main {
public static void main(String[] args) throws Exception {
// New an array blocking queue, initialize 10
Resource resource = new Resource(new ArrayBlockingQueue<>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "Production thread started");
try {
resource.myProd();
} catch(Exception e) { e.printStackTrace(); }},"A thread." ").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "Consumer thread start");
System.out.println();
System.out.println();
try {
resource.myConsume();
} catch(Exception e) { e.printStackTrace(); }},Thread "B").start();
try {
TimeUnit.SECONDS.sleep(5L);
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.stop();
System.out.println("5s time is up, production-consumption mode is out."); }}Copy the code
The results of