One, foreword

In the Concurrent package, BlockingQueue addresses the problem of efficiently and securely transferring data in multiple threads. These efficient and thread-safe queue classes bring great convenience for us to quickly build high quality multithreaded programs. This article details all the members of the BlockingQueue family, including their respective functions and common usage scenarios.

Get to know BlockingQueue

A blocking queue, as its name implies, is first and foremost a queue, and the role of a queue in a data structure is roughly as follows:

It is clear from the figure above that a shared queue allows data to be input from one end of the queue and output from the other.

There are two main types of queues in common use :(of course, there are many different types of queues that can be extended in different implementations, DelayQueue being one of them)

First in, first out (FIFO) : The element of the queue inserted first is also the first out of the queue, similar to queuing. To some extent, this queue also reflects a kind of fairness.

Last in, first out (LIFO) : The last element to be queued is the first to exit the queue. This queue takes precedence over the most recent events.

In multithreaded environments, it is easy to share data through queues, such as in the classic “producer” and “consumer” model. Suppose we have producer threads, and then we have consumer threads. If the producer thread needs to share the prepared data with the consumer thread, it can easily solve the data sharing problem by passing the data in a queue. But what if there’s a mismatch in data processing speed between producers and consumers at some point in time? Ideally, if the producer is producing data faster than the consumer is consuming it, and the amount of data produced has accumulated to a point, the producer must pause (block the producer thread) while the consumer thread processes the accumulated data, and vice versa. However, prior to the release of the Concurrent package, in a multi-threaded environment, each of us programmers had to control these details ourselves, especially with regard to efficiency and thread safety, which added considerable complexity to our programs. Fortunately, the powerful Concurrent package came out of nowhere, and it also brought us the powerful BlockingQueue. (In the multithreaded world: blocking, in some cases suspends the thread (that is, blocks), and the suspended thread is automatically woken up once the condition is met).

The following two graphs illustrate two common blocking scenarios for BlockingQueue:

As shown above: When there is no data in the queue, all threads on the consumer side are automatically blocked (suspended) until data is put into the queue.

As shown in the figure above: When the queue is filled with data, all threads on the producer side are automatically blocked (suspended) until the thread is automatically woken up at an empty place in the queue.

This is why we need BlockingQueue in a multi-threaded environment. As a user of BlockingQueue, you no longer have to worry about when you need to block or wake up threads, because BlockingQueue does everything for you. Since BlockingQueue is so powerful, let’s take a look at how it works in common:

The core method of BlockingQueue

  1. In the data

(1) Offer (anObject): if possible, add anObject to BlockingQueue. Return true if the BlockingQueue can hold anObject, false otherwise. This method does not block the thread currently executing the method.

(2) Offer (E O, long timeout, TimeUnit Unit) : You can set the waiting time. If you cannot join BlockingQueue within the specified time, you will return the failure.

(3) Put (anObject): Add anObject to BlockingQueue. If the BlockQueue runs out of space, the thread calling this block is blocked until there is space in the BlockingQueue.

  1. To get the data

(1) the poll (time) : take BlockingQueue row in the first place in the object, if not immediately, then you can time parameters, such as the stipulated time, fails to take returns null.

(2) the poll (long timeout, TimeUnit unit) : a team of the first object removed from a BlockingQueue, if within a specified time, queue once data, then immediately return to the data in the queue. Otherwise, no data is retrieved until time out, and failure is returned.

(3) Take (): Remove the first object in the BlockingQueue. If the BlockingQueue is empty, block the queue until new data is added to the BlockingQueue.

(4) drainTo(): Get all available data objects from BlockingQueue at once (you can also specify the number of data objects to get). There is no need to batch lock or release lock multiple times.

Common BlockingQueue

Now that you know what BlockingQueue does, let’s take a look at the basic members of the BlockingQueue family.

  1. ArrayBlockingQueue

ArrayBlockingQueue (ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue Identifies the position of the head and tail of the queue in the array.

ArrayBlockingQueue, in particular, differs from LinkedBlockingQueue in that it shares the same lock object where producers put data and consumers get data, meaning that they cannot really run in parallel. ArrayBlockingQueue can use separate locks to run producer and consumer operations in full parallel. Doug Lea probably didn’t do this because the data write and fetch operations of ArrayBlockingQueue are already light enough that introducing a separate locking mechanism would have no performance benefit other than adding additional complexity to the code. Another significant difference between ArrayBlockingQueue and LinkedBlockingQueue is that the ArrayBlockingQueue does not create or destroy any additional object instances when inserting or deleting elements, while the ArrayBlockingQueue generates an additional Node object. This has a different impact on GC in systems that need to process large volumes of data efficiently and concurrently over long periods of time. When creating ArrayBlockingQueue, we can also control whether the internal lock of the object is fair, which is not fair by default.

  1. LinkedBlockingQueue

A list-based blocking queue, like an ArrayListBlockingQueue, maintains a data buffer queue (consisting of a linked list). When a producer puts data into the queue, the queue takes it from the producer and caches it inside the queue. The producer immediately returns the data. Only when the queue buffer reaches its maximum cache capacity (which can be specified by the constructor) will the producer queue be blocked until the consumer consumes a piece of data from the queue and the producer thread is woken up, and the reverse is also true for the consumer. LinkedBlockingQueue can process concurrent data efficiently because it uses separate locks on the producer side and the consumer side to control data synchronization. This means that in high concurrency cases, producers and consumers can operate data in the queue in parallel to improve the concurrency performance of the entire queue.

As developers, we need to be aware that if we construct a LinkedBlockingQueue object without specifying its capacity, LinkedBlockingQueue defaults to a capacity of something like infinite size (integer.max_value), If the speed of the producer is greater than the speed of the consumer, the system may run out of memory before the queue is fully blocked.

ArrayBlockingQueue and LinkedBlockingQueue are two of the most common and commonly used blocking queues that are sufficient to handle producer-consumer issues between multiple threads.

The following code demonstrates how to use BlockingQueue:

(1) the test class

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueTest {

    public static void main(String[] args) throws InterruptedException {
        // Declare a cache queue of size 10
        BlockingQueue<String> queue = new LinkedBlockingQueue<String> (10);

        // There are three producers and one consumer
        Producer producer1 = new Producer(queue);
        Producer producer2 = new Producer(queue);
        Producer producer3 = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        / / with the aid of Executors
        ExecutorService service = Executors.newCachedThreadPool();
        // Start the thread
        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer);

        // 执行10s
        Thread.sleep(10 * 1000);
        producer1.stop();
        producer2.stop();
        producer3.stop();

        Thread.sleep(2000);
        / / from the Executorservice.shutdown(); }}Copy the code

(2) Producers

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/** * Producer thread **@author wuhy* /
public class Producer implements Runnable {

    private volatile boolean  isRunning = true;// Whether the flag is running
    private BlockingQueue queue;// block the queue
    private static AtomicInteger count = new AtomicInteger();// Automatically updated value
    private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;

    // constructor
    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        String data = null;
        Random r = new Random();

        System.out.println("Start producer thread!");
        try {
            while (isRunning) {
                System.out.println("Producing data...");
                Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));// A random number from 0 to DEFAULT_RANGE_FOR_SLEEP

                data = "data:" + count.incrementAndGet();// Atomically increments the current value of count by 1
                System.out.println("Add data:" + data + "Put in queue...");
                if(! queue.offer(data,2, TimeUnit.SECONDS)) {// Set the wait time to 2s, and return true if more than 2s is not added
                    System.out.println("Failed to insert data:"+ data); }}}catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("Exit producer thread!");
        }
    }

    public void stop() {
        isRunning = false; }}Copy the code

(3) Consumers

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/** ** Consumer thread **@author wuhy* /
public class Consumer implements Runnable {

    private BlockingQueue<String> queue;
    private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;

    // constructor
    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    public void run() {
        System.out.println("Start the consumer thread!");
        Random r = new Random();
        boolean isRunning = true;
        try {
            while (isRunning) {
                System.out.println("Getting data from queue...");
                String data = queue.poll(2, TimeUnit.SECONDS);If there is no data in the queue, the queue will be blocked. If there is no data in the queue, the queue will be blocked
                if (null! = data) { System.out.println("Get the data:" + data);
                    System.out.println("Ongoing consumption data:" + data);
                    Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
                } else {
                    // If no data exceeds 2s, all production threads are considered to have exited, and the consuming thread is automatically exited.
                    isRunning = false; }}}catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("Exit the consumer thread!); }}}Copy the code
  1. DelayQueue

An element in a DelayQueue can only be fetched from the queue when the specified delay time is up. DelayQueue is a queue with no size limit, so the operations that insert data into the queue (producers) are never blocked, but only the operations that fetch data (consumers) are blocked.

  1. PriorityBlockingQueue

A priority-based blocking queue (the priority is determined by the Compator object passed in by the constructor), but note that PriorityBlockingQueue does not block the data producer, but only the consumer of the data if there is no data to consume. It is important to note that producers must not produce data faster than consumers can consume it, or over time they will eventually exhaust all available heap memory space. When implementing PriorityBlockingQueue, the internal control thread synchronization lock is a fair lock.

  1. SynchronousQueue

A unbuffered waiting queue, similar to deal directly without mediation, a bit like a primitive society of producers and consumers, producers and their products to market products sell to the final consumers, and consumers must go to market find to commodity producers directly, if one side failed to find a suitable target, so I’m sorry, everyone’s in the market waiting for. Relative to a buffer BlockingQueue, less a middle dealer link (buffer), if there is a distributor, the products directly to the wholesale to distributors, producers and dealers don’t need to care about will eventually to sell these products to the customer, the dealer can part inventory goods, so relative to the direct trading patterns, Generally speaking, adopting the mode of intermediary dealer will have higher throughput (it can buy and sell in batches). On the other hand, the timely response performance of individual products may be reduced due to the introduction of distributors, which add additional transaction links from producer to consumer.

There are two different ways to declare a SynchronousQueue, and they have different behaviors. The difference between fair mode and unfair mode:

In fairness mode, SynchronousQueue uses fair locking and a FIFO queue to block redundant producers and consumers.

But if the mode is unfair (SynchronousQueue defaults) : SynchronousQueue uses an unfair lock and a LIFO queue to manage redundant producers and consumers. In the SynchronousQueue, if there is a gap in the processing speed of producers and consumers, it is likely that some producer or consumer data will never be processed.

Five, the summary

BlockingQueue not only implements the basic functionality of a full queue, but also manages the automatic wake-up between multiple lines in a multi-threaded environment, allowing programmers to ignore these details and focus on more advanced functions.

The blogger has opened a new wechat public account: Welcome to pay attention to ‘Balding Record’, smart enough to go black.

Reference wsmajunfeng.iteye.com/blog/162935…