Click “like” to see, form a habit, the public account search [dime technology] pay attention to more original technical articles. This article has been included in GitHub org_Hejianhui /JavaStudy.

preface

  • Concurrent programming begins with a holistic understanding of the underlying workings of an operating system
  • In-depth understanding of the Java Memory model (JMM) and the volatile keyword
  • In-depth understanding of THE CPU Cache Consistency Protocol (MESI)
  • Synchronized in-depth understanding of concurrent programming
  • Concurrent programming abstract queue synchronizer AQS application Lock details
  • Blocking queue – ArrayBlockingQueue source code analysis
  • Block queue – LinkedBlockingQueue source code analysis
  • Blocking queue – PriorityBlockingQueue source code analysis
  • Blocking queue – DelayQueue source analysis
  • Blocking queue – SynchronousQueue source code analysis
  • Block queue – LinkedTransferQueue source analysis
  • Block queue – LinkedBlockingDeque source code analysis
  • Block queue – DelayedWorkQueue source code analysis

A queue is a special linear table and a FIFO data structure. It only allows delete operations at the front of the table and insert operations at the rear. The end that inserts is called the end of the queue, and the end that deletes is called the head of the queue. A queue with no elements is called an empty queue.

The following is an inheritance diagram of Queue:

Queue

Queue: The upper interface of a Queue, providing methods for inserting, deleting, and retrieving elements, and providing two methods for each type. Let’s start with the insert methods:

  • Add (E E) : Inserts elements to the end of the queue, returns true on success, and throws an IllegalStateException when no space is available.
  • Offer (E E) : Inserts elements to the end of the queue, returns true on success, false otherwise.

The only difference between add and offer as insert methods is what happens when the queue is full. Add throws an exception, while offer returns false.

Let’s look at the delete and get methods (similar to the insert methods) :

  • Remove () : gets and removes the element at the head of the queue. The difference between this method and poll is that poll does not throw an exception if the queue is empty.
  • Poll () : Gets and removes the element at the head of the queue, or returns NULL if the queue is empty.
  • Element () : Gets the element at the head of the queue. This method differs from peek in that it throws an exception if the queue is empty, but peek does not.
  • Peek () : Gets the element at the head of the queue, or returns null if the queue is empty.

If the queue is empty, the remove and Element methods throw exceptions, while poll and peek return null.

Queue is a unidirectional Queue. In order to provide more powerful functions, JDK 1.6 added a bidirectional Queue Deque for more flexible Queue operations.

Deque

Deque adds the following methods to Queue:

  • AddFirst (E E) : Inserts elements in the front end, handling exceptions like add;
  • AddLast (E E) : Inserts elements in the back end, which has the same effect as add.
  • OfferFirst (E E) : Insert elements in the front end and handle exceptions the same as offer;
  • OfferLast (E E) : Insert an element at the back end, which has the same effect as an offer;
  • RemoveFirst () : removes an element from the front end, exception handling is the same as remove;
  • RemoveLast () : removes an element from the back end, just like remove.
  • PollFirst () : Removes an element from the front, which has the same effect as a poll;
  • PollLast () : removes an element from the back end, exception handling the same as poll;
  • GetFirst () : Gets an element in the front, like element;
  • GetLast () : Gets an element on the back end, with the same exception handling as element;
  • PeekFirst () : Gets an element of the front end, as peek does;
  • PeekLast () : gets an element on the back end, with the same exception handling as peek;
  • RemoveFirstOccurrence (Object O) : Removes the element whose first is O from the front end;
  • RemoveLastOccurrence (Object O) : Removes the first o element from the back end;
  • Push (E E) : same effect as addFirst;
  • Pop () : Same effect as removeFirst.

As you can see, the effect of many methods is the same, but the name is different. For example, Deque defines push and POP methods to implement Stack semantics.

BlockingQueue blocks the queue

BlockingQueue implements the function of blocking waiting on the basis of Queue. It is an interface added in JDK 1.5. It refers to a queue that blocks when a producer adds elements to the queue but the queue is full. When a consumer removes an element from the queue but the queue is empty, the consumer is blocked.

BlockingQueue, the most useful class provided by the java.util.Concurrent package for solving the concurrent producer-consumer problem, solves the problem of efficiently and safely “transferring” data in multiple threads. BlockingQueue allows only one thread to take or put at any time, and BlockingQueue provides a timeout return NULL mechanism. It can be found in many production scenarios.

Overall understanding

What are the blocking queues that we use in general? The following class diagram provides an overview:As you can see, BlockingQueue is an interface that also inherits from two other interfaces BlockingDeque (a double-ended queue) and TransferQueue (transferring elements between two threads).

Blocking queue members are as follows:

The queue boundedness The lock The data structure
ArrayBlockingQueue Bounded (bounded) Lock (fair/unfair lock, global lock) An array of
LinkedBlockingQueue optionally-bounded Lock (add and acquire separate locks) Singly linked lists
PriorityBlockingQueue unbounded Lock (only one lock, entry is always successful, exit blocked) Array (default length 11, expansion), the underlying heap structure implementation (binary heap)
DelayQueue unbounded lock Array (expandable)
SynchronousQueue bounded No lock (CAS) Queue (Fair policy), stack (unfair policy)
LinkedTransferQueue unbounded Unlocked (spin +CAS) Dual data structures or dual queues
LinkedBlockingDeque unbounded lock Two-way linked list
DelayWorkQueue unbounded lock Array (initial length 16, expansion), the bottom heap structure implementation (binary heap)

Queue type

  1. An unbounded queue — Virtually unlimited growth
  2. A bounded queue — defines the maximum capacity

Queue data structure

A queue is essentially a structure for storing data

  • Usually implemented as linked lists or arrays
  • Generally speaking, queues have FIFO first-in, first-out (FIFO) characteristics, and of course, there are double-endian queues (Deque) priority queues
  • Main operations: Enqueue and Dequeue

Five common blocking queues

  • ArrayBlockingQueue: A bounded blocking queue composed of array structures.
  • LinkedBlockingQueue: A bounded blocking queue consisting of a linked list structure.
  • PriorityBlockingQueue: An unbounded blocking queue that supports priority sorting.
  • SynchronousQueue: A blocking queue that does not store elements.
  • DelayQueue: an unbounded blocking queue implemented using a priority queue.

BlockingQueue API

The core method of BlockingQueue

public interface BlockingQueue<E> extends Queue<E> {

    // Sets the given element to the queue, returning true on success, false otherwise. The offer() method is recommended if you are setting values to a queue of limited length.
    boolean add(E e);

    // Sets the given element to the queue, returning true on success, false. E cannot be null, otherwise a null pointer exception is thrown.
    boolean offer(E e);

    If there is no more space in the queue, the method blocks until there is more space in the queue.
    void put(E e) throws InterruptedException;

    // Sets the given element to the queue at the given time, returning true if set successfully, false otherwise.
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    If there is no value in the queue, the thread will block until there is a value in the queue and the method gets the value.
    E take(a) throws InterruptedException;

    // At a given time, a value is fetched from the queue. When the time is up, the ordinary poll method is called directly.
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    // Get the remaining space in the queue.
    int remainingCapacity(a);

    // Removes the specified value from the queue.
    boolean remove(Object o);

    // Determine whether the value exists in the queue.
    public boolean contains(Object o);

    // Remove all values from the queue and set them concurrently to the given set.
    int drainTo(Collection<? super E> c);

    // Set the maximum number of values in the queue to the specified set.
    int drainTo(Collection<? super E> c, int maxElements);
}
Copy the code

All the methods of the BlockingQueue interface fall into two broad categories: methods that are responsible for adding elements to the queue and methods that retrieve those elements. In the full/empty queue case, each method from the two groups behaves differently.

Add elements

methods instructions
add() Return true if the insertion was successful, otherwise IllegalStateException is thrown
put() Inserts the specified element into the queue. If the queue is full, it blocks until there is room to insert
offer() Return true if the insert was successful, false otherwise
offer(E e, long timeout, TimeUnit unit) Attempts to insert elements into the queue, and if the queue is full, blocks until there is room to insert

Retrieve elements

methods instructions
take() Gets the header element of the queue and removes it, blocking and waiting for the element to become available if the queue is empty
poll(long timeout, TimeUnit unit) Retrieves and deletes the head of the queue, if necessary, waiting a specified amount of time to make the element available, or returning NULL if timeout occurs

The most important thing about BlockingQueue is that it has a few methods for blocking wait, which can be used to implement the production-consumption model.

ArrayBlockingQueue

ArrayBlockingQueue A bounded blocking queue supported by an array. The queue is implemented based on an array. The size of the queue is defined when the ArrayBlockingQueue object is created. This queue sorts elements on a first-in, first-out (FIFO) basis. Fair and unfair locks are supported. The default value is unfair locks.

Within ArrayBlockingQueue, thread safety is implemented by ReentrantLock, and wake-up is implemented by Condition’s await and signal. Its data structure is an array, or rather a loop array (analogous to a ring), with all subscripts automatically starting at zero when the maximum length is reached.

To understand more about ArrayBlockingQueue, read The Source code of ArrayBlockingQueue.

LinkedBlockingQueue

LinkedBlockingQueue An optional bounded queue supported by a linked list node that is an unbounded (theoretically bounded) list-based queue sorted in first-in, first-out order. LinkedBlockingQueue differs from ArrayBlockingQueue in that it defaults to integer.max_value, which is an unbounded queue, if the size is not specified. Therefore, in order to avoid the situation of machine load or memory overflow caused by the large queue, we recommend manual transmission of a queue size when using.

LinkedBlockingQueue is internally implemented by a single linked list that can only fetch elements from head and add elements from tail. Add elements and acquire elements have separate locks, which means that LinkedBlockingQueue is read-write and read-write, and reads and writes can be performed in parallel. LinkedBlockingQueue uses a ReentrantLock to keep threads safe in the event of concurrency.

Any operation that adds an element to an infinite queue will never block, so it can grow to a very large capacity.

The most important thing when designing a producer-consumer model with an unlimited BlockingQueue is that consumers should be able to consume messages as quickly as producers can add messages to the queue. Otherwise, memory might fill up and you get an OutOfMemory exception.

For an in-depth understanding of LinkedBlockingQueue, read blocking Queue – LinkedBlockingQueue source Code Analysis **

PriorityBlockingQueue

PriorityBlockingQueue * * * * priority queue, thread-safe (add, read all the lock), unbounded, read the blocking queue, and the pile structure underlying implementation (binary tree), default is small root pile, the smallest or biggest elements would have been placed at the top, take the top data every time you get. Can achieve priority out of the queue. In particular, it has only one lock, queue entry is always successful, and queue exit is blocked only when the queue is empty. It can be said that there are certain application scenarios. For example, if there is a task to be executed, you can add a weight of priority to the task, so that the queue will identify the task and queue out the task first.

To understand PriorityBlockingQueue, read the source code of PriorityBlockingQueue.

DelayQueue

DelayQueue is a priority-supported time-based scheduling queue, internally implemented using a non-thread-safe PriorityQueue, while unbounded queues are implemented based on array expansion. When you create an element, you can specify how long it takes to get the current element from the queue. Elements cannot be fetched from the queue until the delay expires.

To understand DelayQueue, read blocking Queue source Code analysis.

SynchronousQueue

SynchronousQueue A blocking queue that does not store elements. Each PUT operation must wait for a take operation or elements cannot be added. Support fair lock and unfair lock to access the queue. The default is to use an unfair policy to access the queue. The underlying fairness policy uses queue-like data structures, while the underlying non-fairness policy uses stack-like data structures. SynchronousQueue has a higher throughput than LinkedBlockingQueue and ArrayBlockingQueue.

For an in-depth understanding of SynchronousQueue, read the Blocking Queue-SynchronousQueue source Code.

LinkedTransferQueue

The LinkedTransferQueue is an unbounded blocking transfer queue composed of a list structure. It is a combination of many queues (ConcurrentLinkedQueue, LinkedBlockingQueue, SynchronousQueue). In addition to having the basic blocking queue function (but this blocking queue does not use locks); The queue implements the TransferQueue interface and overrides the Transfer and tryTransfer methods, which provide matching functionality similar to the SynchronousQueue equal-mode queue.

For an in-depth understanding of the LinkedTransferQueue, read blocking Queue – Source Analysis of the LinkedTransferQueue

LinkedBlockingDeque

LinkedBlockingDeque a two-way blocking queue consisting of a linked list structure. Elements can be added to and removed from the head and tail of the queue. Multiple threads can be concurrent to reduce lock contention by half.

For an in-depth understanding of LinkedBlockingDeque, read Blocking Queue – LinkedBlockingDeque Source Code Analysis

DelayedWorkQueue

DelayedWorkQueue is also a DelayQueue designed for timed tasks. Its implementation principle is basically the same as DelayQueue. The core data structure is the priority queue of the binary minimum heap, which will automatically expand when the queue is full. In this way, the method call specific to scheduled task can be added flexibly in the process.

To understand DelayedWorkQueue, read block Queue — DelayedWorkQueue source Code

Comparison and analysis

LinkedBlockingQueue is different from ArrayBlockingQueue

  • Queue sizes vary. ArrayBlockingQueue is bounded and must be specified for initialization, while LinkedBlockingQueue can be bounded or unbounded (integer.max_value). In the latter case, when the adding speed is greater than the removing speed, In unbounded cases, problems such as memory overflow may occur.
  • Unlike data storage containers, ArrayBlockingQueue uses arrays as data storage containers, while LinkedBlockingQueue uses Nodes as linked lists.
  • Because ArrayBlockingQueue is a storage container for arrays, no additional object instances are created or destroyed when elements are inserted or deleted, whereas LinkedBlockingQueue generates an additional Node object. This can have a significant impact on GC when large volumes of data need to be processed efficiently and concurrently over long periods of time.
  • ArrayBlockingQueue uses the same ReenterLock lock as ArrayBlockingQueue. LinkedBlockingQueue uses the same ReenterLock as ArrayBlockingQueue. PutLock is used for adding and takeLock is used for removing, which can greatly improve the throughput of the queue. It also means that in the case of high concurrency, producers and consumers can operate data in the queue in parallel, so as to improve the concurrency performance of the whole queue.

Differences between LinkedTransferQueue and SynchronousQueue (fair mode)

  • The LinkedTransferQueue and SynchronousQueue are essentially the same in that they are both unchained blocking queues and use dual queues.
  • SynchronousQueue implements fair and unfair queues through internal transferers. There is no distinction between fair and unfair queues in the LinkedTransferQueue.
  • LinkedTransferQueue implements the TransferQueue interface, which defines operations with blocking operations and is richer than transferers in SynchronousQueue.
  • SynchronousQueue both put and fetch operations block. When an operation in the queue does not match the current operation, the thread blocks until a matching operation arrives. The LinkedTransferQueue is an unbounded queue. The data placing operation will not block, and the data fetch operation may block if no matching operation is found. ASYNC,SYNC,NOW, and TIMED are determined by the parameters.

LinkedBlockingDeque is different from LinkedList

package com.niuh.deque;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;

/* * LinkedBlockingDeque is a "thread-safe" queue, while LinkedList is a non-thread-safe queue. * * Here is an example of "multiple threads operating at the same time and traversing a queue" * (1) The program works when queue is LinkedBlockingDeque. * (2) when the queue is a LinkedList object, the program will produce abnormal ConcurrentModificationException. * * /
public class LinkedBlockingDequeRunner {

    // TODO:Queue is a LinkedList object, the program gets an error.
    // private static Queue<String> queue = new LinkedList<String>();
    private static Queue<String> queue = new LinkedBlockingDeque<String>();

    public static void main(String[] args) {

        // Start two threads on queue at the same time!
        new MyThread("A").start();
        new MyThread("B").start();
    }

    private static void printAll(a) {
        String value;
        Iterator iter = queue.iterator();
        while (iter.hasNext()) {
            value = (String) iter.next();
            System.out.print(value + ",");
        }
        System.out.println();
    }

    private static class MyThread extends Thread {
        MyThread(String name) {
            super(name);
        }

        @Override
        public void run(a) {
            int i = 0;
            while (i++ < 6) {
                // "thread name" + "-" +"
                String val = Thread.currentThread().getName() + i;
                queue.add(val);
                // Iterate through the queue through "Iterator".printAll(); }}}}Copy the code

Output result:

A1, 
A1, A2, 
A1, A2, A3, 
A1, A2, A3, A4, 
A1, A2, A3, A4, A5, 
A1, A2, A3, A4, A5, A6, 
A1, A2, A3, A4, A5, A6, B1, 
A1, A2, A3, A4, A5, A6, B1, B2, 
A1, A2, A3, A4, A5, A6, B1, B2, B3, 
A1, A2, A3, A4, A5, A6, B1, B2, B3, B4, 
A1, A2, A3, A4, A5, A6, B1, B2, B3, B4, B5, 
A1, A2, A3, A4, A5, A6, B1, B2, B3, B4, B5, B6, 
Copy the code

The result: In the sample application, two threads (thread A and thread B) are started to act on LinkedBlockingDeque:

  • In the case of thread A, it takes the “thread name” + “Ordinal number” and adds that string to LinkedBlockingDeque;
  • Next, iterate through and output all the elements in the LinkedBlockingDeque.
  • Thread B does the same as thread A, except that thread B has A different name than thread A.
  • When queue is LinkedBlockingDeque, the program works.
  • If change the queue to LinkedList, the program produces ConcurrentModificationException anomalies.

BlockingQueue application

Multithreaded producer-consumer example

Next, we create a program that is composed of two parts: Producer and Consumer.

They are all producers.

The producer will generate a random number from 0 to 100 (the number of the perfect tonic pill) and place that number in the BlockingQueue. We will create 16 threads (Panjinlian) to generate random numbers and block using the put() method until there is free space in the queue.

The important thing to remember is that we need to prevent our consumer threads from waiting indefinitely for elements to appear in the queue.

A good way to signal from the producer (Pan Jinlian) to the consumer (Wu Dalang) is to send a special message called a poison pill instead of processing the message. We need to send as many poison pills as possible because we have consumers. Then when the consumer gets the particular poison pill message from the queue, it completes execution gracefully.

The codes for the following producers:

package com.niuh.queue;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;

/** ** **/
@Slf4j
public class NumbersProducer implements Runnable {
    private BlockingQueue<Integer> numbersQueue;
    private final int poisonPill;
    private final int poisonPillPerProducer;

    public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
        this.numbersQueue = numbersQueue;
        this.poisonPill = poisonPill;
        this.poisonPillPerProducer = poisonPillPerProducer;
    }

    public void run(a) {
        try {
            generateNumbers();
        } catch(InterruptedException e) { Thread.currentThread().interrupt(); }}private void generateNumbers(a) throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
            log.info("Pan Jinlian -{}, bubble medicine for Wu Dalang!", Thread.currentThread().getId());
        }

        /*while (true) { numbersQueue.put(ThreadLocalRandom.current().nextInt(100)); if (false) { break; }} * /

        for (int j = 0; j < poisonPillPerProducer; j++) {
            numbersQueue.put(poisonPill);
            log.info("Pan Jinlian -{}, put the {} poison pill into wu Dalang's medicine!", Thread.currentThread().getId(), j + 1); }}}Copy the code

Our generator constructor takes BlockingQueue as an argument to coordinate processing between producers and users, and we see the method generateNumbers() putting 100 elements (producing 100 pills for Wu To eat) into the queue. It also needs a poison pill message to know the type of message to put into the queue when the execution is complete. This message needs to queue poisonPillPerProducer once.

Consumer

Each consumer will use the take() method to get an element from BlockingQueue, so it will block until there is an element in the queue. After fetching an Integer from the queue, it checks for a poison pill and completes the execution of a thread if it does. Otherwise, it prints the result on standard output along with the name of the current thread.

package com.niuh.queue;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BlockingQueue;

/** ** Consumer **/
@Slf4j
public class NumbersConsumer implements Runnable {
    private BlockingQueue<Integer> queue;
    private final int poisonPill;

    public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
        this.queue = queue;
        this.poisonPill = poisonPill;
    }

    public void run(a) {
        try {
            while (true) {
                Integer number = queue.take();
                if (number.equals(poisonPill)) {
                    return;
                }
                log.info("Wu Dalang -{} No., drink medicine - No. :{}", Thread.currentThread().getId(), number); }}catch(InterruptedException e) { Thread.currentThread().interrupt(); }}}Copy the code

The important thing to note is the use of queues. As in the generator constructor, the queue is passed as a parameter. We can do this because BlockingQueue can be shared between threads without any display synchronization.

Verification test

Now that we have producers and consumers, we can start our plan. We need to define the size of the queue and set it to 10 elements. We create four producer threads and create a consumer thread equal to the number of available processors:

package com.niuh.queue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/** * Multithreaded producer-consumer example **/
public class Main {

    public static void main(String[] args) {
        int BOUND = 10;
        int N_PRODUCERS = 16;
        int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); / / = 8
        int poisonPill = Integer.MAX_VALUE;
        int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; / / = 0
        int mod = N_CONSUMERS % N_PRODUCERS;/ / 0 + 8 = 8

        BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(BOUND);

        // Pan Jinlian makes medicine for Wu Dalang
        for (int i = 1; i < N_PRODUCERS; i++) {
            new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
        }

        // Wu Dalang began to drink the medicine
        for (int j = 0; j < N_CONSUMERS; j++) {
            new Thread(new NumbersConsumer(queue, poisonPill)).start();
        }

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // Pan jinlian began to poison, Wu Dalang drink poison GG
        new Thread(newNumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start(); }}Copy the code

BlockingQueue is created using a construct with capacity. We are creating 4 producers and N consumers. We specify our poison pill (pill) message as integer.max_value because our producer would never send such a value under normal operating conditions. The most important thing to note here is that BlockingQueue is used to coordinate work between them.

Related articles

  • Concurrent programming begins with a holistic understanding of the underlying workings of an operating system
  • In-depth understanding of the Java Memory model (JMM) and the volatile keyword
  • In-depth understanding of THE CPU Cache Consistency Protocol (MESI)
  • Synchronized in-depth understanding of concurrent programming
  • Concurrent programming abstract queue synchronizer AQS application Lock details
  • Blocking queue – ArrayBlockingQueue source code analysis
  • Block queue – LinkedBlockingQueue source code analysis
  • Blocking queue – PriorityBlockingQueue source code analysis
  • Blocking queue – DelayQueue source analysis
  • Blocking queue – SynchronousQueue source code analysis
  • Block queue – LinkedTransferQueue source analysis
  • Block queue – LinkedBlockingDeque source code analysis
  • Block queue – DelayedWorkQueue source code analysis

PS: The above code is submitted to Github: github.com/Niuh-Study/…

GitHub Org_Hejianhui /JavaStudy GitHub Hejianhui /JavaStudy