Java geek
Related reading:
Java Concurrent programming (1) Knowledge map Java concurrent programming (2) Atomic Java concurrent programming (3) Visibility Java concurrent programming (4) Sequential Java concurrent programming (5) Introduction to Creating threads Java concurrent programming (6) Synchronized usage Java Concurrency programming Introduction (7) Easy to understand wait and Notify and use scenarios Java concurrency programming Introduction (8) Thread lifecycle Java concurrency programming Introduction (9) Deadlock and deadlock bit Java concurrency programming Introduction (10) lock optimization Introduction to Concurrent Programming in Java (11) Flow limiting scenario and Spring flow limiter Introduction to Concurrent programming in Java (13) Read/write lock and cache template Introduction to Concurrent programming in Java (14) CountDownLatch Application Scenario (15) CyclicBarrier application scenario Introduction to Concurrent programming in Java (16) seconds to understand the difference between the thread pool Java Concurrent programming introduction (17) one picture master common classes and interfaces for threads Java concurrent programming introduction (18) Again on thread safety Java concurrent programming introduction (19) Asynchronous task scheduling tool CompleteFeature Java Concurrent programming introduction (20) Common locking scenarios and locking tools
I. Application scenarios
The producer and consumer mode is applied to asynchronous processing scenarios. The advantage of asynchronous processing is that producers and consumers are decoupled and not interdependent. Producers do not need to wait for consumers to finish processing, so they can continue to produce and consume content, which greatly improves the efficiency.
Second, code class structure
The producer and consumer code classes are structured as follows:
2. A Producer is a Producer, in this case an abstract class, and a subclass needs to implement the generateTask method.
3.Consumer is a Consumer, in this case an abstract class, and subclasses need to implement the exec method.
4. The Producer and Consumer are just abstract code templates with simple logic. Appropriate templates can be written according to actual needs.
C, Show me code
I, BlockedQueue. Java
import java.util.Vector;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/ * * *@ClassName BlockedQueue
* @DescriptionBlock the task queue. If a task has reached its maximum capacity, it will block waiting *@AuthorSonorous leaf *@Date 2019/10/5 11:32
* @Version1.0 * * * / javashizhan.com
public class BlockedQueue<T>{
/ / lock
private final Lock lock = new ReentrantLock();
// Condition variable: the queue is not satisfied
private final Condition notFull = lock.newCondition();
// Condition variable: the queue is not empty
private final Condition notEmpty = lock.newCondition();
// Task set
private Vector<T> taskQueue = new Vector<T>();
// Queue capacity
private final int capacity;
/** * constructor *@paramCapacity Queue capacity */
public BlockedQueue(int capacity) {
this.capacity = capacity;
}
/** * queue operation *@param t
*/
public void enq(T t) {
lock.lock();
try {
System.out.println("size: " + taskQueue.size() + " capacity: " + capacity);
while (taskQueue.size() == this.capacity) {
// Wait until the queue is full
notFull.await();
}
System.out.println(Thread.currentThread().getName() + " add task: " + t.toString());
taskQueue.add(t);
// After joining the queue, the queue is not empty
notEmpty.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally{ lock.unlock(); }}/** * queue operation *@return* /
public T deq(a){
lock.lock();
try {
try {
while (taskQueue.size() == 0) {
// Wait when queue is empty. Wait when queue is not emptynotEmpty.await(); }}catch (InterruptedException e) {
e.printStackTrace();
}
T t = taskQueue.remove(0);
// If the queue is not satisfied, you can continue to join the queue
notFull.signal();
return t;
}finally{ lock.unlock(); }}}Copy the code
II, Producer. Java
/ * * *@ClassName Producer
* @DescriptionProducer, this class is relatively simple, using inheritance does not save much code, can be inherited, can also be self-implemented. *@AuthorSonorous leaf *@DateMoreover * 2019/10/5,@Version1.0 * * * / javashizhan.com
public abstract class Producer<T> implements Runnable {
private BlockedQueue<T> taskQueue;
public Producer(BlockedQueue<T> taskQueue) {
this.taskQueue = taskQueue;
}
public void run(a) {
while(true) {
T[] tasks = generateTask();
if (null! = tasks && tasks.length >0) {
for(T task: tasks) {
if (null! = task) {this.taskQueue.enq(task);
}
}
}
}
}
/** * generates tasks using the "template method" design pattern, as long as subclasses implement this method. *@return* /
public abstract T[] generateTask();
}
Copy the code
III, Consumer. Java
/ * * *@ClassName Consumer
* @DescriptionConsumer, this class is relatively simple, using inheritance does not save much code, can be inherited, can also be self-implemented. *@AuthorSonorous leaf *@Date"2019/10/5 *@Version1.0 * * * / javashizhan.com
public abstract class Consumer<T> implements Runnable {
private BlockedQueue<T> taskQueue;
public Consumer(BlockedQueue<T> taskQueue) {
this.taskQueue = taskQueue;
}
public void run(a) {
while(true) { T task = taskQueue.deq(); exec(task); }}/** * performs tasks using the "template method" design pattern. Subclasses can implement this method only@param task
*/
public abstract void exec(T task);
}
Copy the code
IV. Use code examples
import java.util.Vector;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/ * * *@ClassName BlockedQueue
* @Description TODO
* @AuthorSonorous leaf *@Date2019/10/5 do *@Version1.0 * * * / javashizhan.com
public class LockTest {
public static void main(String[] args) {
BlockedQueue<String> taskQueue = new BlockedQueue<String>(10);
for (int i = 0; i < 3; i++) {
String producerName = "Producder-" + i;
Thread producer = new Thread(new Producer<String>(taskQueue) {
@Override
public String[] generateTask() {
String[] tasks = new String[20];
for (int i = 0; i < tasks.length; i++) {
long timestamp = System.currentTimeMillis();
tasks[i] = "Task_" + timestamp + "_" + i;
}
return tasks;
}
}, producerName);
producer.start();
}
for (int i = 0; i < 5; i++) {
String consumerName = "Consumer-" + i;
Thread consumer = new Thread(new Consumer<String>(taskQueue) {
@Override
public void exec(String task) {
System.out.println(Thread.currentThread().getName() + " do task [" + task + "]");
// Sleep for a while to simulate the task execution time
sleep(2000);
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch(InterruptedException e) { e.printStackTrace(); } } }, consumerName); consumer.start(); }}}Copy the code
Output log:
size: 0 capacity: 10
Producder-1 add task: Task_1570250409102_0
size: 1 capacity: 10
Producder-1 add task: Task_1570250409103_1
size: 2 capacity: 10
Producder-1 add task: Task_1570250409103_2
size: 3 capacity: 10
Producder-1 add task: Task_1570250409103_3
size: 4 capacity: 10
Producder-1 add task: Task_1570250409103_4
size: 5 capacity: 10
Producder-1 add task: Task_1570250409103_5
size: 6 capacity: 10
Producder-1 add task: Task_1570250409103_6
size: 7 capacity: 10
Producder-1 add task: Task_1570250409103_7
size: 8 capacity: 10
Producder-1 add task: Task_1570250409103_8
size: 9 capacity: 10
Producder-1 add task: Task_1570250409103_9
size: 10 capacity: 10
size: 10 capacity: 10
size: 10 capacity: 10
Consumer-0 do task [Task_1570250409102_0]
Consumer-4 do task [Task_1570250409103_1]
Consumer-3 do task [Task_1570250409103_2]
Producder-1 add task: Task_1570250409103_10
Consumer-1 do task [Task_1570250409103_3]
Producder-0 add task: Task_1570250409102_0
size: 8 capacity: 10
Producder-0 add task: Task_1570250409103_1
size: 9 capacity: 10
Producder-0 add task: Task_1570250409103_2
size: 10 capacity: 10
size: 10 capacity: 10
Consumer-2 do task [Task_1570250409103_4]
Producder-0 add task: Task_1570250409103_3
size: 10 capacity: 10
Consumer-3 do task [Task_1570250409103_6]
Producder-2 add task: Task_1570250409103_0
Consumer-1 do task [Task_1570250409103_5]
size: 9 capacity: 10
Producder-2 add task: Task_1570250409103_1
size: 10 capacity: 10
Consumer-4 do task [Task_1570250409103_7]
Consumer-0 do task [Task_1570250409103_8]
Producder-1 add task: Task_1570250409103_11
size: 9 capacity: 10
Producder-1 add task: Task_1570250409103_12
size: 10 capacity: 10
Consumer-2 do task [Task_1570250409103_9]
Producder-1 add task: Task_1570250409103_13
size: 10 capacity: 10
Copy the code
Other instructions
1. Lock is used here to Lock, Lock is more flexible than synchronized keyword Lock, if there are special needs, easy to change.
Synchronized is an example of how to implement producer and consumer patterns using Java concurrent Programming (part 7). That code isn’t generic enough, so you can make it more generic.
3. In this example, synchronized locking is not that different from wait and Notify using Java concurrency programming.
4. Need to pay attention to when using a bounded blocking queue producer production task process is controllable, if it is a third party not controlled call, when the production speed is far greater than consumers processing tasks, may suspend for a long time, due to obstruction or hang, time is too long, cause the waiting thread is too much, or timeout failed. Blocking is not appropriate, and an exception should be thrown when the queue is full to tell the caller not to wait.
end.
<– Read the mark, left like!