Learning from Silicon Valley

concept

The queue

A queue can be thought of as an array, entering at one end and exiting at the other, waiting to buy food

Blocking queue

BlockingQueue is a queue, and the role a BlockingQueue plays in a data structure is roughly as follows:

Thread 1 adds elements to the blocking queue, while thread 2 removes elements from the blocking queue

  • When the blocking queue is empty, fetching elements from the queue will be blocked
    • Unable to retrieve cakes from cupboards when cupboards are empty
  • When the blocking queue is full, adding elements from the queue will be blocked
    • When the cupboards in the cupcake shop are full, it is impossible to add more cakes to the cupboards

That is, a thread trying to fetch an element from an empty blocking queue will be blocked until another thread inserts a new element into the empty queue

Similarly, a thread that tries to add new elements to a blocking queue that is already full, until another thread removes one or more elements from the full queue, or completely emptying the queue, causing the queue to become idle again and add new elements later

Why?

When we go to Haidilao for dinner, the hall is full and we need to wait in the waiting hall. However, these waiting customers can bring profits to the business, so we welcome them to block

In the multithreaded world: so-called blocking, suspends the thread under certain emptying conditions (i.e., blocking) and wakes the suspended thread up again as soon as the condition is met

Why do I need BlockingQueue

The good thing is we don’t have to worry about when we need to block or wake up threads, because BlockingQueue does it all for you, right

Prior to the concurrent package, in a multi-threaded environment, each of us programmers had to control these details ourselves, especially with regard to efficiency and thread safety, which caused considerable complexity in our programs.

When do we use blocking queues? : multi-threaded concurrent processing, thread pool with more!

architecture

// The ArrayList collection class is familiar to you. // I also used CopyOnWriteList and BlockingQueueCopy the code

BlockingQueue is a BlockingQueue that belongs to an interface and has seven implementation classes underneath

  • ArrayBlockQueue: a bounded blocking queue composed of array structures
  • LinkedBlockingQueue: a bounded (but default size integer.max_value) blocking queue consisting of a linked list structure
    • It’s bounded, but it’s so big, it’s unbounded, you can think of it as unbounded
  • PriorityBlockQueue: An unbounded blocking queue that supports priority sorting
  • DelayQueue: delay unbounded blocking queue implemented using priority queues
  • SynchronousQueue: A blocking queue that does not store elements, that is, a queue of individual elements
    • Produce one, consume one, don’t store elements, don’t consume, don’t produce
  • LinkedTransferQueue: An unbounded blocking queue consisting of a linked list structure
  • LinkedBlockingDeque: A two-way blocking queue consisting of a linked list structure

The things to know are ArrayBlockQueue, LinkedBlockingQueue, SynchronousQueue

BlockingQueue core method

Four groups of API

way An exception is thrown Return value, no exception thrown Block waiting for Timeout waiting for
add add(e) offer(e) put(e) offer(e,time,unit)
remove remove() poll() take() poll(time,unit)
Detects the head element of the queue element() peek()
An exception is thrown When the blocking Queue is full: IIIegalStateException is thrown when adding an element to the Queue: Queue Full When the blocking Queue is empty: NoSuchException is thrown when removing an element from the Queue
particularity Insert method, true on success, false on failure remove method: return queue element on success, return empty if no queue element
Has been blocked When the blocking queue is full, the producer continues to put elements into the queue, and the queue blocks the producer thread until the PUT data or response is interrupted and exits. When the blocking queue is empty, the consumer thread tries to take elements from the queue, and the queue blocks the consumer thread until the queue is available.
Timeout exit When the blocking queue is full, the queue blocks the producer thread for a certain amount of time, and then the producer thread exits

Throw exception group

The add method, however, throws an exception when it adds elements to an already full ArrayBlockingQueue

// Block queue, need to fill in the default value
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));

System.out.println(blockingQueue.add("XXX"));
Copy the code

After the operation:

true
true
true
Exception in thread "main" java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(AbstractQueue.java:98)
	at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
	at com.moxi.interview.study.queue.BlockingQueueDemo.main(BlockingQueueDemo.java:25)
Copy the code

And we’re also going to throw an exception if we fetch more elements, so let’s say we only store three values, but when we fetch, we fetch four times

// Block queue, need to fill in the default value
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));

System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
Copy the code

So there’s an exception

true
true
true
a
b
c
Exception in thread "main" java.util.NoSuchElementException
	at java.util.AbstractQueue.remove(AbstractQueue.java:117)
	at com.moxi.interview.study.queue.BlockingQueueDemo.main(BlockingQueueDemo.java:30)
Copy the code

Boolean type group

We use the offer method, which returns false if the blocking queue is full, and true if it is not

At the same time, if the queue is empty, null is returned

BlockingQueue blockingQueue = new ArrayBlockingQueue(3);

System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("d"));

System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
Copy the code

The results

true
true
true
false
a
b
c
null
Copy the code

Blocking queue group

We use the put method, and when we add elements, if the blocking queue is full, the thread that added the message will block until the queue is empty and it will wake up

This is usually used in messaging middleware such as RabbitMQ, where the message must be blocked to ensure that it is not lost

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
System.out.println("= = = = = = = = = = = = = = = =");

blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
Copy the code

Also, taking a message will block if the content does not exist

Be there or be square

Offer (), poll plus time

When using offer inserts, you need to specify a time. If the offer is not inserted within 2 seconds, the insert is abandoned

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a".2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("b".2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("c".2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("d".2L, TimeUnit.SECONDS));
Copy the code

And also judge when you take it

System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
Copy the code

If not retrieved within 2 seconds, null is returned

SynchronousQueue

SynchronousQueue has no capacity. Unlike other BlockingQueues, SynchronousQueue is a non-stored BlockingQueue. Each PUT operation must wait for a take operation, or no additional elements can be added

Let’s test the process of adding elements to SynchronousQueue

First we create two threads, one for production and one for consumption

The production thread has put fields A, B, and C respectively

BlockingQueue<String> blockingQueue = new SynchronousQueue<>();

new Thread(() -> {
    try {       
        System.out.println(Thread.currentThread().getName() + "\t put A ");
        blockingQueue.put("A");
       
        System.out.println(Thread.currentThread().getName() + "\t put B ");
        blockingQueue.put("B");        
        
        System.out.println(Thread.currentThread().getName() + "\t put C ");
        blockingQueue.put("C");        
        
    } catch(InterruptedException e) { e.printStackTrace(); }},"t1").start();
Copy the code

The consuming thread uses take to block the contents of the queue and waits five seconds each time before consuming

        new Thread(() -> {
            try {

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
                System.out.println(Thread.currentThread().getName() + "\t take A ");

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
                System.out.println(Thread.currentThread().getName() + "\t take B ");

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
                System.out.println(Thread.currentThread().getName() + "\t take C ");

            } catch(InterruptedException e) { e.printStackTrace(); }},"t2").start();
Copy the code

The final output is:

T1 put A T2 take A 5 seconds later... T1 put B T2 take B 5 seconds later... t1 put C t2 take CCopy the code

As can be seen from the final running result, every time t1 thread adds elements to the blocking queue, t1 input thread will wait for T2 consumption thread. After T2 consumption, T2 is in the suspended state, waiting for T1 to save, and thus goes round and round, forming the state of one saving and one fetching

The usefulness of blocking queues

Producer-consumer model

For a variable with an initial value of 0, two threads alternately operate on it, one incrementing and one subtracting, for five rounds

There are a few things to remember about multi-threaded operations

  • Threads manipulate resource classes
  • Judgment work notice
  • Prevent false wake up mechanism

Synchronized edition of producer and consumer issues

package com.company;
/** * a variable with an initial value of 0, two threads alternately operate on it, one increment, one subtraction, for 5 rounds */
/** * Thread manipulates resource classes * determines work notification * prevents false wake up mechanism */

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/** * Resource class */
class ShareData {

    private int number = 0;

    public synchronized void increment(a) throws Exception{

        try {
            / / determine
            while(number ! =0) {
                // Waiting cannot produce
                this.wait();
            }

            / / work
            number++;

            System.out.println(Thread.currentThread().getName() + "\t " + number);

            // Wake up notification
            this.notifyAll();
        } catch(Exception e) { e.printStackTrace(); }}public synchronized void decrement(a) throws Exception{

        try {
            / / determine
            while(number == 0) {
                // Waiting cannot consume
                this.wait();
            }

            / / work
            number--;

            System.out.println(Thread.currentThread().getName() + "\t " + number);

            // Wake up notification
            this.notifyAll();
        } catch(Exception e) { e.printStackTrace(); }}}public class ProdConsumerTraditionDemo {

    public static void main(String[] args) {

        // High cohesion, low coupling cohesion refers to an air conditioner with its own means of regulating temperature

        ShareData shareData = new ShareData();

        // t1 thread, production
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.increment();
                } catch(Exception e) { e.printStackTrace(); }}},"t1").start();

        // t2 thread, consumption
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.decrement();
                } catch(Exception e) { e.printStackTrace(); }}},"t2").start(); }}Copy the code

Extension question? Can I change this to if?

            while(number == 0) {
                // Waiting cannot consume
                this.wait();
            }
Copy the code

There is A problem, A B C D 4 threads! False wake up, the solution is to replace if with while

        Increment (); increment(); increment(); increment(); increment(); increment() If =0, a. wait() waits (wait() releases the lock), and C tries to execute the production method, but still checks number! =0, then b. wait() waits (wait() releases the lock). It happens that the consumer thread B/D consumes a product, making number=0, and then calls this.notifyall () after B/D consumes; The producer thread continues to wait(), and the consumer calls this.notifyall (). The producer thread waits () and the consumer calls this.notifyall (). Producers then continue to produce ahead of themselves, eventually leading to 'excess capacity', where number is greater than 1 if(number! = 0){// wait this.wait(); } * /
        while(number ! =0) { // Note that you can't use "if" or false wake up will occur
            / / wait for
            this.wait();
        }
Copy the code

[img-cfjt6mq1-1613968690163] (C:\Users\shouk\OneDrive\ Note image \image-20210218184801318.png) ]

JUC edition of producer and consumer issues

Condition is found by Lock in the official documentation

Click on Condition to check

Let’s implement a simple producer-consumer pattern, starting with the resource class ShareData

/** * Resource class */
class ShareData {

    private int number = 0;

    private Lock lock = new ReentrantLock();

    private Condition condition = lock.newCondition();

    public void increment(a) throws Exception{
        // synchronize code block, lock
        lock.lock();
        try {
            / / determine
            while(number ! =0) {
                // Waiting cannot produce
                condition.await();
            }

            / / work
            number++;

            System.out.println(Thread.currentThread().getName() + "\t " + number);

            // Wake up notification
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally{ lock.unlock(); }}public void decrement(a) throws Exception{
        // synchronize code block, lock
        lock.lock();
        try {
            / / determine
            while(number == 0) {
                // Waiting cannot consume
                condition.await();
            }

            / / work
            number--;

            System.out.println(Thread.currentThread().getName() + "\t " + number);

            // Wake up notification
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally{ lock.unlock(); }}}Copy the code

It includes a number variable and offers methods for increment and Decrement, which increase and decrease the number by one, respectively

However, in order to prevent false wake up mechanism, we should use while instead of if when making judgment

/ / determine
while(number ! =0) {
    // Waiting cannot produce
    condition.await();
}
Copy the code

Do not use if to judge

/ / determine
if(number ! =0) {
    // Waiting cannot produce
    condition.await();
}
Copy the code

The complete code

package com.company;
/** * a variable with an initial value of 0, two threads alternately operate on it, one increment, one subtraction, for 5 rounds */
/** * Thread manipulates resource classes * determines work notification * prevents false wake up mechanism */

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/** * Resource class */
class ShareData {

    private int number = 0;

    private Lock lock = new ReentrantLock();

    private Condition condition = lock.newCondition();

    public void increment(a) throws Exception{
        // synchronize code block, lock
        lock.lock();
        try {
            / / determine
            while(number ! =0) {
                // Waiting cannot produce
                condition.await();
            }

            / / work
            number++;

            System.out.println(Thread.currentThread().getName() + "\t " + number);

            // Wake up notification
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally{ lock.unlock(); }}public void decrement(a) throws Exception{
        // synchronize code block, lock
        lock.lock();
        try {
            / / determine
            while(number == 0) {
                // Waiting cannot consume
                condition.await();
            }

            / / work
            number--;

            System.out.println(Thread.currentThread().getName() + "\t " + number);

            // Wake up notification
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally{ lock.unlock(); }}}public class ProdConsumerTraditionDemo {

    public static void main(String[] args) {

        // High cohesion, low coupling cohesion refers to an air conditioner with its own means of regulating temperature

        ShareData shareData = new ShareData();

        // t1 thread, production
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.increment();
                } catch(Exception e) { e.printStackTrace(); }}},"t1").start();

        // t2 thread, consumption
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.decrement();
                } catch(Exception e) { e.printStackTrace(); }}},"t2").start(); }}Copy the code

Finally, after successful operation, one of us will produce and the other will consume

t1	 1
t2	 0
t1	 1
t2	 0
t1	 1
t2	 0
t1	 1
t2	 0
t1	 1
t2	 0
Copy the code

Any new technology does not just cover the original technology, it has its advantages and complement to the old technology!

Condition notifies and wakes up threads precisely

package com.company;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/** * Resource class */
class ShareData {


    private Lock lock = new ReentrantLock();

    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();
    private int thisOne = 1;

    public void printA(a) throws Exception{
        // synchronize code block, lock
        lock.lock();
        try {
            / / determine
            while(thisOne ! =1) {
                // Waiting cannot produce
                condition1.await();
            }
            System.out.println(Thread.currentThread().getName() + "=>AAAAAAA");
            / / work
            // Wake up the specified person, B
            thisOne = 2;
            condition2.signal();//
        } catch (Exception e) {
            e.printStackTrace();
        } finally{ lock.unlock(); }}public void printB(a) throws Exception{
        // synchronize code block, lock
        lock.lock();
        try {
            / / determine
            while(thisOne ! =2) {
                // Waiting cannot produce
                condition2.await();
            }
            System.out.println(Thread.currentThread().getName() + "=>BBBB");
            thisOne = 3;
            condition3.signal();//
        } catch (Exception e) {
            e.printStackTrace();
        } finally{ lock.unlock(); }}public void printC(a) throws Exception{
        // synchronize code block, lock
        lock.lock();
        try {
            / / determine
            while(thisOne ! =3) {
                // Waiting cannot produce
                condition3.await();
            }
            System.out.println(Thread.currentThread().getName() + "=>CCCCC");
            thisOne = 1;
            condition1.signal();//
        } catch (Exception e) {
            e.printStackTrace();
        } finally{ lock.unlock(); }}}public class ProdConsumerTraditionDemo {

    public static void main(String[] args) {

        // High cohesion, low coupling cohesion refers to an air conditioner with its own means of regulating temperature

        ShareData shareData = new ShareData();

        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.printA();
                } catch(Exception e) { e.printStackTrace(); }}},"t1").start();

        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.printB();
                } catch(Exception e) { e.printStackTrace(); }}},"t2").start();

        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.printC();
                } catch(Exception e) { e.printStackTrace(); }}},"t3").start(); }}Copy the code
// ABC is advancing in an orderly manner
t1=>AAAAAAA
t2=>BBBB
t3=>CCCCC
t1=>AAAAAAA
t2=>BBBB
t3=>CCCCC
t1=>AAAAAAA
t2=>BBBB
t3=>CCCCC
t1=>AAAAAAA
t2=>BBBB
t3=>CCCCC
t1=>AAAAAAA
t2=>BBBB
t3=>CCCCC

Copy the code

Producer and Consumer 3.0

Prior to the concurrent package release, in a multi-threaded environment, each of us programmers had to control these details ourselves, especially with regard to efficiency and thread safety, which resulted in significant time complexity for our programs

Now we use the new blocking queue version producer and consumer, using: volatile, CAS, atomicInteger, BlockQueue, thread interaction, atomic reference

/** * Producer consumer blocking queue version * uses: volatile, CAS, atomicInteger, BlockQueue, thread interaction, atomic reference */

class MyResource {
    // It is enabled by default for production and consumption
    // Volatile is used to keep data visible, meaning that when TLAG changes, other threads are immediately notified of the changes
    private volatile boolean FLAG = true;

    // use atoms to wrap classes instead of number++
    private AtomicInteger atomicInteger = new AtomicInteger();

    // A specific SynchronousBlockingQueue cannot be instantiated to satisfy the condition
    BlockingQueue<String> blockingQueue = null;

    // It should be passed in using the construct injection method in dependency injection
    public MyResource(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
        // What class is passed in
        System.out.println(blockingQueue.getClass().getName());
    }

    /** **@throws Exception
     */
    public void myProd(a) throws Exception{
        String data = null;
        boolean retValue;
        // Use while to avoid false wake up
        // Start production when FLAG is true
        while(FLAG) {
            data = atomicInteger.incrementAndGet() + "";

            // Store 1 data in 2 seconds
            retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
            if(retValue) {
                System.out.println(Thread.currentThread().getName() + "\t Insert queue :" + data  + "Success" );
            } else {
                System.out.println(Thread.currentThread().getName() + "\t Insert queue :" + data  + "Failure" );
            }

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println(Thread.currentThread().getName() + "\t Stop production, indicating FLAG=false, production introduction");
    }

    /** * consumption *@throws Exception
     */
    public void myConsumer(a) throws Exception{
        String retValue;
        // Use while to avoid false wake up
        // Start production when FLAG is true
        while(FLAG) {
            // Store 1 data in 2 seconds
            retValue = blockingQueue.poll(2L, TimeUnit.SECONDS);
            if(retValue ! =null&& retValue ! ="") {
                System.out.println(Thread.currentThread().getName() + "\t consumption queue :" + retValue  + "Success" );
            } else {
                FLAG = false;
                System.out.println(Thread.currentThread().getName() + "\t consumption failed, queue is empty, exit" );

                // Exit the consumption queue
                return; }}}/** * Stop production judgment */
    public void stop(a) {
        this.FLAG = false; }}public class ProdConsumerBlockingQueueDemo {

    public static void main(String[] args) {
        // Pass in the concrete implementation class, ArrayBlockingQueue
        MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10));

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "\t Production thread started");
            System.out.println("");
            System.out.println("");
            try {
                myResource.myProd();
                System.out.println("");
                System.out.println("");
            } catch(Exception e) { e.printStackTrace(); }},"prod").start();


        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "\t consume thread start");

            try {
                myResource.myConsumer();
            } catch(Exception e) { e.printStackTrace(); }},"consumer").start();

        // Stop production and consumption after 5 seconds
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("");
        System.out.println("");
        System.out.println("After 5 seconds, the production and consumption threads stop and the thread terminates."); myResource.stop(); }}Copy the code

Final run result

Java. Util. Concurrent. ArrayBlockingQueue prod production thread start consumer spending thread start prod inserted into the queue:1Successful Consumer Consumption queue:1Prod successfully inserted into queue:2Successful Consumer Consumption queue:2Prod successfully inserted into queue:3Successful Consumer Consumption queue:3Prod successfully inserted into queue:4Successful Consumer Consumption queue:4Prod successfully inserted into queue:5Successful Consumer Consumption queue:5successful5The production and consumption threads stop and the thread ends. Prod Stops production, indicating FLAG=false, Production introductionCopy the code