Learning from Silicon Valley
concept
The queue
A queue can be thought of as an array, entering at one end and exiting at the other, waiting to buy food
Blocking queue
BlockingQueue is a queue, and the role a BlockingQueue plays in a data structure is roughly as follows:
Thread 1 adds elements to the blocking queue, while thread 2 removes elements from the blocking queue
When the blocking queue is empty, fetching elements from the queue will be blocked
- Unable to retrieve cakes from cupboards when cupboards are empty
When the blocking queue is full, adding elements from the queue will be blocked
- When the cupboards in the cupcake shop are full, it is impossible to add more cakes to the cupboards
That is, a thread trying to fetch an element from an empty blocking queue will be blocked until another thread inserts a new element into the empty queue
Similarly, a thread that tries to add new elements to a blocking queue that is already full, until another thread removes one or more elements from the full queue, or completely emptying the queue, causing the queue to become idle again and add new elements later
Why?
When we go to Haidilao for dinner, the hall is full and we need to wait in the waiting hall. However, these waiting customers can bring profits to the business, so we welcome them to block
In the multithreaded world: so-called blocking, suspends the thread under certain emptying conditions (i.e., blocking) and wakes the suspended thread up again as soon as the condition is met
Why do I need BlockingQueue
The good thing is we don’t have to worry about when we need to block or wake up threads, because BlockingQueue does it all for you, right
Prior to the concurrent package, in a multi-threaded environment, each of us programmers had to control these details ourselves, especially with regard to efficiency and thread safety, which caused considerable complexity in our programs.
When do we use blocking queues? : multi-threaded concurrent processing, thread pool with more!
architecture
// The ArrayList collection class is familiar to you. // I also used CopyOnWriteList and BlockingQueueCopy the code
BlockingQueue is a BlockingQueue that belongs to an interface and has seven implementation classes underneath
- ArrayBlockQueue: a bounded blocking queue composed of array structures
- LinkedBlockingQueue: a bounded (but default size integer.max_value) blocking queue consisting of a linked list structure
- It’s bounded, but it’s so big, it’s unbounded, you can think of it as unbounded
- PriorityBlockQueue: An unbounded blocking queue that supports priority sorting
- DelayQueue: delay unbounded blocking queue implemented using priority queues
- SynchronousQueue: A blocking queue that does not store elements, that is, a queue of individual elements
- Produce one, consume one, don’t store elements, don’t consume, don’t produce
- LinkedTransferQueue: An unbounded blocking queue consisting of a linked list structure
- LinkedBlockingDeque: A two-way blocking queue consisting of a linked list structure
The things to know are ArrayBlockQueue, LinkedBlockingQueue, SynchronousQueue
BlockingQueue core method
Four groups of API
way | An exception is thrown | Return value, no exception thrown | Block waiting for | Timeout waiting for |
---|---|---|---|---|
add | add(e) | offer(e) | put(e) | offer(e,time,unit) |
remove | remove() | poll() | take() | poll(time,unit) |
Detects the head element of the queue | element() | peek() | – | – |
An exception is thrown | When the blocking Queue is full: IIIegalStateException is thrown when adding an element to the Queue: Queue Full When the blocking Queue is empty: NoSuchException is thrown when removing an element from the Queue |
---|---|
particularity | Insert method, true on success, false on failure remove method: return queue element on success, return empty if no queue element |
Has been blocked | When the blocking queue is full, the producer continues to put elements into the queue, and the queue blocks the producer thread until the PUT data or response is interrupted and exits. When the blocking queue is empty, the consumer thread tries to take elements from the queue, and the queue blocks the consumer thread until the queue is available. |
Timeout exit | When the blocking queue is full, the queue blocks the producer thread for a certain amount of time, and then the producer thread exits |
Throw exception group
The add method, however, throws an exception when it adds elements to an already full ArrayBlockingQueue
// Block queue, need to fill in the default value
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.add("XXX"));
Copy the code
After the operation:
true
true
true
Exception in thread "main" java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
at com.moxi.interview.study.queue.BlockingQueueDemo.main(BlockingQueueDemo.java:25)
Copy the code
And we’re also going to throw an exception if we fetch more elements, so let’s say we only store three values, but when we fetch, we fetch four times
// Block queue, need to fill in the default value
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
Copy the code
So there’s an exception
true
true
true
a
b
c
Exception in thread "main" java.util.NoSuchElementException
at java.util.AbstractQueue.remove(AbstractQueue.java:117)
at com.moxi.interview.study.queue.BlockingQueueDemo.main(BlockingQueueDemo.java:30)
Copy the code
Boolean type group
We use the offer method, which returns false if the blocking queue is full, and true if it is not
At the same time, if the queue is empty, null is returned
BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("d"));
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
Copy the code
The results
true
true
true
false
a
b
c
null
Copy the code
Blocking queue group
We use the put method, and when we add elements, if the blocking queue is full, the thread that added the message will block until the queue is empty and it will wake up
This is usually used in messaging middleware such as RabbitMQ, where the message must be blocked to ensure that it is not lost
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
System.out.println("= = = = = = = = = = = = = = = =");
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
Copy the code
Also, taking a message will block if the content does not exist
Be there or be square
Offer (), poll plus time
When using offer inserts, you need to specify a time. If the offer is not inserted within 2 seconds, the insert is abandoned
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a".2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("b".2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("c".2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("d".2L, TimeUnit.SECONDS));
Copy the code
And also judge when you take it
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
Copy the code
If not retrieved within 2 seconds, null is returned
SynchronousQueue
SynchronousQueue has no capacity. Unlike other BlockingQueues, SynchronousQueue is a non-stored BlockingQueue. Each PUT operation must wait for a take operation, or no additional elements can be added
Let’s test the process of adding elements to SynchronousQueue
First we create two threads, one for production and one for consumption
The production thread has put fields A, B, and C respectively
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "\t put A ");
blockingQueue.put("A");
System.out.println(Thread.currentThread().getName() + "\t put B ");
blockingQueue.put("B");
System.out.println(Thread.currentThread().getName() + "\t put C ");
blockingQueue.put("C");
} catch(InterruptedException e) { e.printStackTrace(); }},"t1").start();
Copy the code
The consuming thread uses take to block the contents of the queue and waits five seconds each time before consuming
new Thread(() -> {
try {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "\t take A ");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "\t take B ");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "\t take C ");
} catch(InterruptedException e) { e.printStackTrace(); }},"t2").start();
Copy the code
The final output is:
T1 put A T2 take A 5 seconds later... T1 put B T2 take B 5 seconds later... t1 put C t2 take CCopy the code
As can be seen from the final running result, every time t1 thread adds elements to the blocking queue, t1 input thread will wait for T2 consumption thread. After T2 consumption, T2 is in the suspended state, waiting for T1 to save, and thus goes round and round, forming the state of one saving and one fetching
The usefulness of blocking queues
Producer-consumer model
For a variable with an initial value of 0, two threads alternately operate on it, one incrementing and one subtracting, for five rounds
There are a few things to remember about multi-threaded operations
- Threads manipulate resource classes
- Judgment work notice
- Prevent false wake up mechanism
Synchronized edition of producer and consumer issues
package com.company;
/** * a variable with an initial value of 0, two threads alternately operate on it, one increment, one subtraction, for 5 rounds */
/** * Thread manipulates resource classes * determines work notification * prevents false wake up mechanism */
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** * Resource class */
class ShareData {
private int number = 0;
public synchronized void increment(a) throws Exception{
try {
/ / determine
while(number ! =0) {
// Waiting cannot produce
this.wait();
}
/ / work
number++;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// Wake up notification
this.notifyAll();
} catch(Exception e) { e.printStackTrace(); }}public synchronized void decrement(a) throws Exception{
try {
/ / determine
while(number == 0) {
// Waiting cannot consume
this.wait();
}
/ / work
number--;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// Wake up notification
this.notifyAll();
} catch(Exception e) { e.printStackTrace(); }}}public class ProdConsumerTraditionDemo {
public static void main(String[] args) {
// High cohesion, low coupling cohesion refers to an air conditioner with its own means of regulating temperature
ShareData shareData = new ShareData();
// t1 thread, production
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.increment();
} catch(Exception e) { e.printStackTrace(); }}},"t1").start();
// t2 thread, consumption
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.decrement();
} catch(Exception e) { e.printStackTrace(); }}},"t2").start(); }}Copy the code
Extension question? Can I change this to if?
while(number == 0) {
// Waiting cannot consume
this.wait();
}
Copy the code
There is A problem, A B C D 4 threads! False wake up, the solution is to replace if with while
Increment (); increment(); increment(); increment(); increment(); increment() If =0, a. wait() waits (wait() releases the lock), and C tries to execute the production method, but still checks number! =0, then b. wait() waits (wait() releases the lock). It happens that the consumer thread B/D consumes a product, making number=0, and then calls this.notifyall () after B/D consumes; The producer thread continues to wait(), and the consumer calls this.notifyall (). The producer thread waits () and the consumer calls this.notifyall (). Producers then continue to produce ahead of themselves, eventually leading to 'excess capacity', where number is greater than 1 if(number! = 0){// wait this.wait(); } * /
while(number ! =0) { // Note that you can't use "if" or false wake up will occur
/ / wait for
this.wait();
}
Copy the code
[img-cfjt6mq1-1613968690163] (C:\Users\shouk\OneDrive\ Note image \image-20210218184801318.png) ]
JUC edition of producer and consumer issues
Condition is found by Lock in the official documentation
Click on Condition to check
Let’s implement a simple producer-consumer pattern, starting with the resource class ShareData
/** * Resource class */
class ShareData {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment(a) throws Exception{
// synchronize code block, lock
lock.lock();
try {
/ / determine
while(number ! =0) {
// Waiting cannot produce
condition.await();
}
/ / work
number++;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// Wake up notification
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally{ lock.unlock(); }}public void decrement(a) throws Exception{
// synchronize code block, lock
lock.lock();
try {
/ / determine
while(number == 0) {
// Waiting cannot consume
condition.await();
}
/ / work
number--;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// Wake up notification
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally{ lock.unlock(); }}}Copy the code
It includes a number variable and offers methods for increment and Decrement, which increase and decrease the number by one, respectively
However, in order to prevent false wake up mechanism, we should use while instead of if when making judgment
/ / determine
while(number ! =0) {
// Waiting cannot produce
condition.await();
}
Copy the code
Do not use if to judge
/ / determine
if(number ! =0) {
// Waiting cannot produce
condition.await();
}
Copy the code
The complete code
package com.company;
/** * a variable with an initial value of 0, two threads alternately operate on it, one increment, one subtraction, for 5 rounds */
/** * Thread manipulates resource classes * determines work notification * prevents false wake up mechanism */
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** * Resource class */
class ShareData {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment(a) throws Exception{
// synchronize code block, lock
lock.lock();
try {
/ / determine
while(number ! =0) {
// Waiting cannot produce
condition.await();
}
/ / work
number++;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// Wake up notification
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally{ lock.unlock(); }}public void decrement(a) throws Exception{
// synchronize code block, lock
lock.lock();
try {
/ / determine
while(number == 0) {
// Waiting cannot consume
condition.await();
}
/ / work
number--;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// Wake up notification
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally{ lock.unlock(); }}}public class ProdConsumerTraditionDemo {
public static void main(String[] args) {
// High cohesion, low coupling cohesion refers to an air conditioner with its own means of regulating temperature
ShareData shareData = new ShareData();
// t1 thread, production
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.increment();
} catch(Exception e) { e.printStackTrace(); }}},"t1").start();
// t2 thread, consumption
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.decrement();
} catch(Exception e) { e.printStackTrace(); }}},"t2").start(); }}Copy the code
Finally, after successful operation, one of us will produce and the other will consume
t1 1
t2 0
t1 1
t2 0
t1 1
t2 0
t1 1
t2 0
t1 1
t2 0
Copy the code
Any new technology does not just cover the original technology, it has its advantages and complement to the old technology!
Condition notifies and wakes up threads precisely
package com.company;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** * Resource class */
class ShareData {
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
private int thisOne = 1;
public void printA(a) throws Exception{
// synchronize code block, lock
lock.lock();
try {
/ / determine
while(thisOne ! =1) {
// Waiting cannot produce
condition1.await();
}
System.out.println(Thread.currentThread().getName() + "=>AAAAAAA");
/ / work
// Wake up the specified person, B
thisOne = 2;
condition2.signal();//
} catch (Exception e) {
e.printStackTrace();
} finally{ lock.unlock(); }}public void printB(a) throws Exception{
// synchronize code block, lock
lock.lock();
try {
/ / determine
while(thisOne ! =2) {
// Waiting cannot produce
condition2.await();
}
System.out.println(Thread.currentThread().getName() + "=>BBBB");
thisOne = 3;
condition3.signal();//
} catch (Exception e) {
e.printStackTrace();
} finally{ lock.unlock(); }}public void printC(a) throws Exception{
// synchronize code block, lock
lock.lock();
try {
/ / determine
while(thisOne ! =3) {
// Waiting cannot produce
condition3.await();
}
System.out.println(Thread.currentThread().getName() + "=>CCCCC");
thisOne = 1;
condition1.signal();//
} catch (Exception e) {
e.printStackTrace();
} finally{ lock.unlock(); }}}public class ProdConsumerTraditionDemo {
public static void main(String[] args) {
// High cohesion, low coupling cohesion refers to an air conditioner with its own means of regulating temperature
ShareData shareData = new ShareData();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.printA();
} catch(Exception e) { e.printStackTrace(); }}},"t1").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.printB();
} catch(Exception e) { e.printStackTrace(); }}},"t2").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.printC();
} catch(Exception e) { e.printStackTrace(); }}},"t3").start(); }}Copy the code
// ABC is advancing in an orderly manner
t1=>AAAAAAA
t2=>BBBB
t3=>CCCCC
t1=>AAAAAAA
t2=>BBBB
t3=>CCCCC
t1=>AAAAAAA
t2=>BBBB
t3=>CCCCC
t1=>AAAAAAA
t2=>BBBB
t3=>CCCCC
t1=>AAAAAAA
t2=>BBBB
t3=>CCCCC
Copy the code
Producer and Consumer 3.0
Prior to the concurrent package release, in a multi-threaded environment, each of us programmers had to control these details ourselves, especially with regard to efficiency and thread safety, which resulted in significant time complexity for our programs
Now we use the new blocking queue version producer and consumer, using: volatile, CAS, atomicInteger, BlockQueue, thread interaction, atomic reference
/** * Producer consumer blocking queue version * uses: volatile, CAS, atomicInteger, BlockQueue, thread interaction, atomic reference */
class MyResource {
// It is enabled by default for production and consumption
// Volatile is used to keep data visible, meaning that when TLAG changes, other threads are immediately notified of the changes
private volatile boolean FLAG = true;
// use atoms to wrap classes instead of number++
private AtomicInteger atomicInteger = new AtomicInteger();
// A specific SynchronousBlockingQueue cannot be instantiated to satisfy the condition
BlockingQueue<String> blockingQueue = null;
// It should be passed in using the construct injection method in dependency injection
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
// What class is passed in
System.out.println(blockingQueue.getClass().getName());
}
/** **@throws Exception
*/
public void myProd(a) throws Exception{
String data = null;
boolean retValue;
// Use while to avoid false wake up
// Start production when FLAG is true
while(FLAG) {
data = atomicInteger.incrementAndGet() + "";
// Store 1 data in 2 seconds
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if(retValue) {
System.out.println(Thread.currentThread().getName() + "\t Insert queue :" + data + "Success" );
} else {
System.out.println(Thread.currentThread().getName() + "\t Insert queue :" + data + "Failure" );
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "\t Stop production, indicating FLAG=false, production introduction");
}
/** * consumption *@throws Exception
*/
public void myConsumer(a) throws Exception{
String retValue;
// Use while to avoid false wake up
// Start production when FLAG is true
while(FLAG) {
// Store 1 data in 2 seconds
retValue = blockingQueue.poll(2L, TimeUnit.SECONDS);
if(retValue ! =null&& retValue ! ="") {
System.out.println(Thread.currentThread().getName() + "\t consumption queue :" + retValue + "Success" );
} else {
FLAG = false;
System.out.println(Thread.currentThread().getName() + "\t consumption failed, queue is empty, exit" );
// Exit the consumption queue
return; }}}/** * Stop production judgment */
public void stop(a) {
this.FLAG = false; }}public class ProdConsumerBlockingQueueDemo {
public static void main(String[] args) {
// Pass in the concrete implementation class, ArrayBlockingQueue
MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t Production thread started");
System.out.println("");
System.out.println("");
try {
myResource.myProd();
System.out.println("");
System.out.println("");
} catch(Exception e) { e.printStackTrace(); }},"prod").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t consume thread start");
try {
myResource.myConsumer();
} catch(Exception e) { e.printStackTrace(); }},"consumer").start();
// Stop production and consumption after 5 seconds
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("");
System.out.println("");
System.out.println("After 5 seconds, the production and consumption threads stop and the thread terminates."); myResource.stop(); }}Copy the code
Final run result
Java. Util. Concurrent. ArrayBlockingQueue prod production thread start consumer spending thread start prod inserted into the queue:1Successful Consumer Consumption queue:1Prod successfully inserted into queue:2Successful Consumer Consumption queue:2Prod successfully inserted into queue:3Successful Consumer Consumption queue:3Prod successfully inserted into queue:4Successful Consumer Consumption queue:4Prod successfully inserted into queue:5Successful Consumer Consumption queue:5successful5The production and consumption threads stop and the thread ends. Prod Stops production, indicating FLAG=false, Production introductionCopy the code