Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.

Blocking queue

A BlockingQueue is a queue that supports two additional operations. These two additional operations support blocking insert and remove methods.

  1. Blocking insertion method: this means that when the queue is full, the queue will block the thread that inserted the element until the queue is full.
  2. Blocking removal is supported: this means that when the queue is empty, the thread that fetched the element will wait for the queue to become non-empty.

Blocking queues are often used in producer and consumer scenarios, where the producer is the thread that adds elements to the queue and the consumer is the thread that fetches elements from the queue. Blocking queues are containers that producers use to store elements and consumers use to get them. These two additional operations provide four ways to handle blocking queues when they are not available:

Method/processing mode An exception is thrown Return special value Has been blocked Timeout exit
insert add(e) offer(e) put(e) offer(e,time,unit)
remove remove() poll() take() poll(time,unit)
check element() peek() Do not use Do not use

An exception is thrown

When the Queue is Full, an IllegalStateException (Queue Full) is thrown when an element is inserted into the Queue. When the queue is empty, fetching elements from the queue throws a NoSuchElementException.

public class MyBlockQueue {
    public static void main(String[] args) {
        ArrayBlockingQueue<Integer> q = new ArrayBlockingQueue<Integer>(3);
        new Thread(()->{
            q.add(1);
            System.out.println(Thread.currentThread().getName()+"Put in an element");
            q.add(2);
            System.out.println(Thread.currentThread().getName()+"Put in an element");
            q.add(3);
            System.out.println(Thread.currentThread().getName()+"Put in an element");
        },Thread 1 "").start();

        new Thread(()->{
            q.remove();
            System.out.println(Thread.currentThread().getName()+"Take away an element.");
            q.remove();
            System.out.println(Thread.currentThread().getName()+"Take away an element.");
            q.remove();
            System.out.println(Thread.currentThread().getName()+"Take away an element.");
            q.remove();
            System.out.println(Thread.currentThread().getName()+"Take away an element.");
        },Thread 2 "").start(); }}Copy the code

Return special value

When an element is inserted into the queue, it returns whether the element was successfully inserted, and true on success. Retrieves an element from the queue if it is removed, or returns false if there is none.

public class MyBlockQueue {


    public static void main(String[] args) {
        ArrayBlockingQueue<Integer> q = new ArrayBlockingQueue<Integer>(3);
        new Thread(()->{
            q.offer(1);
            System.out.println(Thread.currentThread().getName()+"Put in an element");
            q.offer(2);
            System.out.println(Thread.currentThread().getName()+"Put in an element");
            q.offer(3);
            System.out.println(Thread.currentThread().getName()+"Put in an element");
        },Thread 1 "").start();

        new Thread(()->{
            Integer res = q.poll();
            System.out.println(Thread.currentThread().getName()+"Take away an element, return value is"+res);
            res = q.poll();
            System.out.println(Thread.currentThread().getName()+"Take away an element, return value is"+res);
            res = q.poll();
            System.out.println(Thread.currentThread().getName()+"Take away an element, return value is"+res);
            res = q.poll();
            System.out.println(Thread.currentThread().getName()+"Take away an element, return value is"+res);
        },Thread 2 "").start(); }}Copy the code

Has been blocked

When the blocking queue is full, if the producer thread puts elements to the queue, the queue will block the producer thread until the queue is available or the response interrupts and exits. When the queue is empty, if the consumer thread takes elements from the queue, the queue blocks the consumer thread until the queue is not empty.

public class MyBlockQueue {
    public static void main(String[] args) {
        ArrayBlockingQueue<Integer> q = new ArrayBlockingQueue<Integer>(3);

        new Thread(()->{
            try {
                q.put(1);
                System.out.println(Thread.currentThread().getName()+"Put in an element");
                Thread.sleep(1000);
                q.put(2);
                System.out.println(Thread.currentThread().getName()+"Put in an element");
                Thread.sleep(1000);
                q.put(3);
                System.out.println(Thread.currentThread().getName()+"Put in an element");
                System.out.println("Rest for 4 seconds before inserting values.");
                Thread.sleep(4000);
                q.put(4);
                System.out.println(Thread.currentThread().getName()+"Put in an element");
            } catch(InterruptedException e) { e.printStackTrace(); }},Thread 1 "").start();

        new Thread(()->{
            try {
                q.take();
                System.out.println(Thread.currentThread().getName()+"Take away an element.");
                q.take();
                System.out.println(Thread.currentThread().getName()+"Take away an element.");
                q.take();
                System.out.println(Thread.currentThread().getName()+"Take away an element.");
                q.take();
                System.out.println(Thread.currentThread().getName()+"Take away an element.");
            } catch(InterruptedException e) { e.printStackTrace(); }},Thread 2 "").start(); }}Copy the code

In this example, thread 1 sleeps for 4 seconds before inserting the fourth element, so thread 2 blocks waiting for thread 1 to insert the value again.

Timeout exit

When the blocking queue is full, if the producer thread inserts elements into the queue, the queue blocks the producer thread for a certain period of time, and if the specified period of time is exceeded, the producer thread exits.

In the case of a pair of methods, offer(e,time,unit) and poll(time,unit) have two parameters in common: time and unit. Time refers to the value of the demo wait, and unit refers to the unit of the time value, which is the unit of TimeUnit

  • TimeUnit.NANOSECONDS; (纳秒)
  • TimeUnit.MICROSECONDS; (in milliseconds)
  • TimeUnit.MILLISECONDS; (ms)
  • TimeUnit.SECONDS; (in seconds)
  • TimeUnit.MINUTES; (points)
  • TimeUnit.HOURS; (when)
  • TimeUnit.DAYS; (day)
public class MyBlockQueue {
    public static void main(String[] args) {
        ArrayBlockingQueue<Integer> q = new ArrayBlockingQueue<Integer>(3);
        new Thread(()->{
            try {
                q.offer(1);
                System.out.println(Thread.currentThread().getName()+"Put in an element");
                Thread.sleep(1000);
                q.offer(2);
                System.out.println(Thread.currentThread().getName()+"Put in an element");
                Thread.sleep(1000);
                q.offer(3);
                System.out.println(Thread.currentThread().getName()+"Put in an element");
                System.out.println("Rest for 4 seconds before inserting values.");
                Thread.sleep(4000);
                q.offer(4);
                System.out.println(Thread.currentThread().getName()+"Put in an element");
                System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --"+Thread.currentThread().getName()+"Exit ----------");
            } catch(InterruptedException e) { e.printStackTrace(); }},Thread 1 "").start();


        new Thread(()->{
            try {
                Integer res = q.poll(1000,TimeUnit.MILLISECONDS);
                System.out.println(Thread.currentThread().getName()+"Take away an element for"+res);
                res = q.poll(1000,TimeUnit.MILLISECONDS);
                System.out.println(Thread.currentThread().getName()+"Take away an element for"+res);
                res = q.poll(1000,TimeUnit.MILLISECONDS);
                System.out.println(Thread.currentThread().getName()+"Take away an element for"+res);
                res = q.poll(1000,TimeUnit.MILLISECONDS);
                System.out.println(Thread.currentThread().getName()+"Take away an element for"+res);
                System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --"+Thread.currentThread().getName()+"Exit ----------");
            } catch(InterruptedException e) { e.printStackTrace(); }},Thread 2 "").start(); }}Copy the code

In this example, thread 2 is set to exit if it can’t fetch a value for 1 second, so during the 4 seconds that thread 1 sleeps, thread 2 exits because it can’t fetch a value.

Blocking queues in Java

  • ArrayBlockingQueue: A bounded blocking queue composed of array structures
  • LinkedBlockingQueue: A bounded blocking queue consisting of a linked list structure
  • PriorityBlockingQueue: An unbounded blocking queue that supports priority sorting
  • DelayQueue: A bare-bound blocking queue implemented using a priority queue
  • SynchronousQueue: A blocking queue that does not store elements
  • LinkedTransferQueue: An unbounded blocking queue consisting of a linked list structure
  • LinkedBlockingDeque: A bidirectional blocking queue consisting of a linked list structure

SynchronousQueue

SynchronousQueue is a blocking queue that does not store elements. Each PUT operation must wait for a take operation or elements cannot be added.

SynchronousQueue can be thought of as a one-ball passer that passes data processed by the producer thread directly to the consumer thread. In addition, it supports fair access queues, with threads using non-fair access policies by default.

public class MyQueue {

    public static void main(String[] args) {
        SynchronousQueue<Integer> queue = new SynchronousQueue<>();// If an element is not stored, it must be taken from it
        new Thread(()->{
            for (int i = 0; i < 3; i++) {
                try {
                    queue.put(i);
                    System.out.println(Thread.currentThread().getName()+"Put an element in there."+i);
                } catch(InterruptedException e) { e.printStackTrace(); }}},"Thread A").start();


        new Thread(()->{
            for (int i = 0; i < 3; i++) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    queue.take();
                    System.out.println(Thread.currentThread().getName()+"I pulled out an element."+i);
                } catch(InterruptedException e) { e.printStackTrace(); }}},"Thread B").start(); }}Copy the code

Implementation principle of blocking queue

When a consumer consumes an element in a queue, it notifies the producer that the queue is available. ArrayBlockingQueue uses Condition to implement this. For more information on Condition, check out this blog: Java Multithreading (8) producer Consumer — Condition and Precise Wake up — Nuggets (juejin. Cn).