I have learned the producer/consumer model for so long that I finally realized it today. Before WRITING, I will review the producer/consumer model.
The producer/consumer pattern is a classic process synchronization problem in operating systems that can be solved using a “semaphore” mechanism, noted below.
Analyze what needs to be done before writing:
- Define a cache queue and select a collection to cache. The cache only does two things: store data and fetch data
- Define a producer thread to produce data and store it in the cache
- Define a consumer to cache access data
Methods a
Use LinkedHashMap as cache and synchronized as lock
Define a cache queue
public class PublicQueue<T> {
// The index of data insertion
private int putIndex = 0;
// Maximum capacity
private int maxCount = 50;
/ / the buffer
private LinkedHashMap<Integer, T> linkedHashMap = new LinkedHashMap<>();
/** * Add data to the blocking queue *@param msg
*/
public synchronized void put(T msg) {
// If the cached data reaches maxCount, block
if (linkedHashMap.size() == maxCount) {
try {
wait();
} catch(InterruptedException e) { e.printStackTrace(); }}else {
// If it is not full, wake up all other threads
notifyAll();
}
// Store data in the cache
linkedHashMap.put(putIndex, msg);
System.out.println("Produce a product with the current commodity subscript:"+putIndex+"=== the text is:"+msg+"=== Cache length is:"+linkedHashMap.size());
/ / update the putIndex
putIndex = (putIndex + 1 >= maxCount) ? (putIndex + 1) % maxCount : putIndex + 1;
}
/*** * fetch data from the blocking queue *@return* /
public synchronized T get(a) {
// If the blocking queue is empty, the consumer is blocked
if (linkedHashMap.size() == 0) {
try {
wait();
} catch(InterruptedException e) { e.printStackTrace(); }}else {
notifyAll();
}
// Fetch data from Iterator to ensure that the retrieved data is ordered
Iterator iterator = linkedHashMap.entrySet().iterator();
T t = null;
if (iterator.hasNext()) {
Map.Entry<Integer, T> entry = (Map.Entry<Integer, T>) iterator.next();
t = entry.getValue();
int index = entry.getKey();
linkedHashMap.remove(index);
System.out.println("Consume a product with the current commodity subscript:"+index+"=== the text is:"+ t +"=== Cache length is:"+linkedHashMap.size());
}
returnt; }}Copy the code
Define a producer thread
public class Provider extends Thread{
private PublicQueue publicQueue;
public Provider(PublicQueue publicQueue) {
this.publicQueue = publicQueue;
}
@Override
public void run(a) {
for (int i = 0; i < 60; i++) { publicQueue.put(String.valueOf(i)); }}}Copy the code
Define a consumer thread
public class Consumer extends Thread{
private PublicQueue publicQueue;
public Consumer(PublicQueue publicQueue) {
this.publicQueue = publicQueue;
}
@Override
public void run(a) {
for(; ;) { publicQueue.get(); }}}Copy the code
test
public class ProviderConsumerTest {
public static void main(String[] args) {
PublicQueue publicQueue = new PublicQueue();
Provider provider = new Provider(publicQueue);
Consumer consumer = newConsumer(publicQueue); provider.start(); consumer.start(); }}Copy the code
Way 2
This approach is also relatively simple, using the Blocking queue provided by Java
public class PublicQueue<T> {
/ / the buffer
private BlockingDeque<T> blockingDeque = new LinkedBlockingDeque<>(50);
public void add(T msg){
try {
blockingDeque.put(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Produce a product with the current commodity subscript:"+"=== the text is:"+msg);
}
public T get(a){
T t = null;
try {
t = blockingDeque.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Consume a product with the current commodity subscript:"+"=== the text is:"+t);
returnt; }}Copy the code