preface

In Java’s thread pool, tasks are stored in a blocking queue when the number of core threads is full. What is a blocking queue?

A blocking queue is first a queue that supports two additional operations on top of the queue:

  • When the queue is empty, the thread that gets the element waits for the queue to become non-empty
  • When the queue is full, the thread that added the element waits for the queue to become available

So how does a blocking queue block?

Implement a blocking queue yourself

Blocking queue implemented by Synchronized, Wait, and notifyAll
public class BlockingQueue {
    // Place the element index
    private int inputIndex;
    // Fetch the element index
    private int takeIndex;
    // Array of elements
    private String[] elements;
    // The number of elements in the array
    private int count;


    public BlockingQueue(int capacity) {
        elements = new String[capacity];
    }


    public Object take(a) throws InterruptedException {
        synchronized(this) {
            // The reason for using while is that when the thread wakes up, it needs to check whether the array already has elements
            while (count == 0) {
                // There are no more elements in the array
                this.wait();
            }
            Object e = dequeue();
            this.notify();
            System.out.println("take method: " + Arrays.toString(elements));
            returne; }}public void put(String str) throws InterruptedException {
        synchronized (this) {
            // The reason for using while is that after the thread wakes up, it needs to check whether the array element is free again
            while (count == elements.length) {
                // The array is full, wait
                this.wait();
            }
            enqueue(str);
            System.out.println("put method: " + Arrays.toString(elements));
            this.notify(); }}/** * How to join the team *@paramE * / elements
    private void enqueue(String e) {
        elements[inputIndex] = e;

        // If the array is full, input returns the beginning
        if (++inputIndex == elements.length) {
            inputIndex = 0;
        }

        count ++;
    }

    /** * How to team up *@return* /
    private Object dequeue(a) {

        Object e = elements[takeIndex];
        elements[takeIndex] = null;
        // If the takeIndex has reached the end of the array, return the beginning
        if (++takeIndex == elements.length) {
            takeIndex = 0;
        }
        count --;
        returne; }}Copy the code
public static void main(String[] args) {

        BlockingQueue queue = new BlockingQueue(10);
        // 10 threads continuously place elements
        IntStream.range(0.10).forEach(i -> {
          Thread a = new Thread(() -> {
            try {
              queue.put("element");
            } catch(InterruptedException e) { e.printStackTrace(); }}); a.start(); });// 10 threads fetch the element
        IntStream.range(0.10).forEach(i -> {
          Thread b = new Thread(() -> {
            try {
              queue.take();
            } catch(InterruptedException e) { e.printStackTrace(); }}); b.start(); }); }Copy the code

Block queue implemented by condition, await, singal
public class BlockingQueueWithCondition {

    // Place the element index
    private int inputIndex;
    // Fetch the element index
    private int takeIndex;
    // Array of elements
    private String[] elements;
    // The number of elements in the array
    private int count;

    ReentrantLock lock = new ReentrantLock();
    Condition notEmpty = lock.newCondition();
    Condition notFull = lock.newCondition();


    public BlockingQueueWithCondition(int capacity) {
        elements = new String[capacity];
    }


    public String take(a) throws InterruptedException {
        lock.lock();

        try {
            // There are no more elements in the array
            while (count == 0) {
                notEmpty.await();
            }
            String str = elements[takeIndex];
            elements[takeIndex] = null;
	    // If the takeIndex has reached the end of the array, return the beginning
            if (++takeIndex == elements.length) {
                takeIndex = 0;
            }
            notFull.signal();
            System.out.println("take method: " + Arrays.toString(elements));
            count--;
            return str;
        } finally{ lock.unlock(); }}public void put(String str) throws InterruptedException {
        lock.lock();

        try {
            // The array is full, wait
            while (count == elements.length) {
                notFull.await();
            }
            elements[inputIndex] = str;
	    // If inputIndex has reached the end of the array, return the beginning
            if (++inputIndex == elements.length) {
                inputIndex = 0;
            }
            notEmpty.signal();
            System.out.println("put method: " + Arrays.toString(elements));
            count++;
        } finally{ lock.unlock(); }}}Copy the code
public static void main(String[] args) {

        BlockingQueueWithCondition queue = new BlockingQueueWithCondition(10);
        // 10 threads continuously place elements
        IntStream.range(0.10).forEach(i -> {
            Thread a = new Thread(() -> {
                try {
                    queue.put("element");
                } catch(InterruptedException e) { e.printStackTrace(); }}); a.start(); });// 10 threads fetch the element
        IntStream.range(0.10).forEach(i -> {
            Thread b = new Thread(() -> {
                try {
                    queue.take();
                } catch(InterruptedException e) { e.printStackTrace(); }}); b.start(); }); }Copy the code

ReentrantLock, Condition (await and signal) are very similar to synchronized, wait, and notify, so what’s the difference?

  1. When we call wait, we first need to make sure that the thread that called wait already holds the lock on the object. After we call wait, the thread releases the lock on the object and enters the wait set.
  2. When notify is called, the system will randomly wake up any thread in the wait queue for the object. When this thread is woken up, it will compete with other threads for the lock of the object
  3. Synchronized obtains and releases locks through the JVM hierarchy, with no developer concern
  4. ReentrantLock can acquire and release locks by the developer, making it more flexible. A thread calling await method will enter the object’s wait queue, and a call to singal method will specify to wake up a blocked task in an object’s wait queue

Blocking queues in the JDK

There are a lot of blocking queues available in the JDK. Here we’ll just parse LinkedBlockingQueue. If you understand the blocking queue above, you’ll quickly understand the JDK blocking queue source code

Some important parameters
// Elements in the blocking queue are wrapped as a node. There must be nodes in a linked list
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; }}// Block queue size,
private final int capacity;

// Block the current number of elements in the queue
private final AtomicInteger count = new AtomicInteger();

// Block the queue head node
transient Node<E> head;

// Block the last node of the queue
private transient Node<E> last;

// LinkedBlockingQueue uses two locks, and access is mutually exclusive
/ / take the lock
private final ReentrantLock takeLock = new ReentrantLock();

// When there are no elements in the queue, the take lock blocks until another thread wakes it up
private final Condition notEmpty = takeLock.newCondition();

/ / put the lock
private final ReentrantLock putLock = new ReentrantLock();

// When the queue is full, the PUT lock blocks until it is woken up by another thread
private final Condition notFull = putLock.newCondition();
Copy the code
Put method
public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
  	// put Lock Lock
        putLock.lockInterruptibly();
        try {
            // Block on the notFull condition if queue elements are full
            while (count.get() == capacity) {
                notFull.await();
            }
            / / team
            enqueue(node);
            // Select * from queue where queue length = 1
            c = count.getAndIncrement();
            // If the current queue element has not been added to the queue element online, then wake up another thread, because there may be many threads blocked at notFull
            / / condition
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            // put Unlocked the lock
            putLock.unlock();
        }
  	// After adding an element, wake up the thread blocked on the notEmpty condition to fetch the element
        if (c == 0)
            signalNotEmpty();
}
Copy the code
Take method
public E take(a) throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
  	// takeLock locks
        takeLock.lockInterruptibly();
        try {
            // Block on notEmpty if the list element is empty
            while (count.get() == 0) {
                notEmpty.await();
            }
            // Element out of team
            x = dequeue();
            // Select * from queue where queue length = 1
            c = count.getAndDecrement();
            // If the list element > 1, wake up the blocking task in the waiting queue for the specified object
            if (c > 1)
                notEmpty.signal();
        } finally {
            / / releases the lock
            takeLock.unlock();
        }
  	// If the queue length was full before exiting, now wake up the thread blocking on the notFull condition after subtracting one element
        if (c == capacity)
            signalNotFull();
        return x;
}
Copy the code