What is the consumer-producer model

In simple terms, there are two threads that share a common area, but it is important to note that:

  1. When the shared area is full, the producer thread must wait (block);
  2. When the shared area is empty, the consumer thread must wait (block).

For example, when we go to a restaurant for dinner, the chef is the producer and the guest is the consumer. When there are no guests, chefs need to wait for guests before cooking. When the house is full, the guests must wait.

implementation

Wait /notify message notification of Object

  • Wait: Puts the current thread to sleep until it is notified or interrupted.
  • Notify: Select a thread from a thread in waiting state to notify and move the thread from the waiting queue to the synchronization queue.
  • Notifyall: Wakes up all waiting threads and moves them to the synchronization queue.
class Test2{
        LinkedList<Integer> linkedList = new LinkedList<>();
        int size = 10;
        ExecutorService executorService = Executors.newFixedThreadPool(15);
        for(int i = 0; i < 5; i++){
            executorService.submit(new Product(linkedList, size));
        }
        for(int i = 0; i < 10; i++){
            executorService.submit(new Customer(linkedList));
        }
        executorService.shutdown();


    }

    static class Product extends Thread{
        private LinkedList<Integer> linkedList;
        private int size;
        Product(LinkedList<Integer> linkedList, int size){
            this.linkedList = linkedList;
            this.size = size;
        }

        @Override
        public void run(a) {
            while (true) {synchronized (linkedList){
                    try {
                        while(linkedList.size() == 9){
                            System.out.println(Thread.currentThread().getName() + "The shared queue is full, producers are waiting for......");
                            linkedList.wait();
                            System.out.println(Thread.currentThread().getName() + "The producer waits and starts production.");
                        }
                        int idx = new Random().nextInt();
                        System.out.println(Thread.currentThread().getName() + "Product ID:" + idx);
                        linkedList.add(idx);
                        linkedList.notifyAll();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    static class Customer extends Thread{
        private LinkedList<Integer> linkedList;
        Customer(LinkedList<Integer> linkedList){
            this.linkedList = linkedList;
        }

        @Override
        public void run(a) {
            while (true) {synchronized (linkedList){
                    try {
                        while (linkedList.isEmpty()){
                            System.out.println(Thread.currentThread().getName() + "Shared queue empty, consumers start waiting......");
                            linkedList.wait();
                            System.out.println(Thread.currentThread().getName() + "Consumers start spending......");
                        }
                        int tmp = linkedList.removeLast();
                        System.out.println(Thread.currentThread().getName() + "Consumer bill ID:" + tmp);
                        linkedList.notifyAll();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        }
    }
Copy the code

Await /signal of Lock’s condition

Similar to the use of object, the exception is that the lock is acquired before condition is used

class Test3{
    static ReentrantLock lock = new ReentrantLock();
    static Condition full = lock.newCondition();
    static Condition empty = lock.newCondition();
    public static void main(String[] args) {
        LinkedList<Integer> linkedList = new LinkedList<>();
        ExecutorService executorService = Executors.newFixedThreadPool(15);
        for(int i = 0; i < 8; i++){
            executorService.submit(new Product(linkedList, 10, lock));
        }

        for(int i = 0; i < 12; i++){
            executorService.submit(new Customer(linkedList, lock));
        }
        executorService.shutdown();

    }

    static class Product implements Runnable {
        LinkedList<Integer> linkedList;
        int size;
        Lock lock;
        Product(LinkedList<Integer> linkedList, int size, Lock lock){
            this.linkedList = linkedList;
            this.size = size;
            this.lock = lock;
        }


        @Override
        public void run(a) {
            while (true){
                lock.lock();
                try {
                    while (linkedList.size() == size){
                        System.out.println(Thread.currentThread().getName() + "The shared queue is full, producers are waiting...");
                        full.await();
                        System.out.println(Thread.currentThread().getName() + "The producer waits...");
                    }
                    int idx = new Random().nextInt();
                    linkedList.add(idx);
                    System.out.println(Thread.currentThread().getName() + "Production Product ID :" + idx);
                    empty.signalAll();

                }catch (Exception e){
                    e.printStackTrace();
                }finally{ lock.unlock(); }}}}static class Customer implements Runnable{
        LinkedList<Integer> linkedList;
        int size;
        Lock lock;
        Customer(LinkedList<Integer> linkedList, Lock lock){
            this.linkedList = linkedList;
            this.lock = lock;
        }
        @Override
        public void run(a) {
            while (true){
                lock.lock();
                try {
                    while (linkedList.isEmpty()){
                        System.out.println(Thread.currentThread().getName() + "Shared queue empty, consumption starts waiting...");
                        empty.await();
                        System.out.println(Thread.currentThread().getName() + "The raw consumer is waiting...");
                    }
                    int idx = linkedList.removeLast();
                    System.out.println(Thread.currentThread().getName() + "Consumer Product ID :" + idx);
                    full.signalAll();
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }
            }
        }
    }
}
Copy the code

Based on the BlockingQueue

BlockingQueue simply adds condition – based locks to the PUT and POP methods

class Test4{
    static LinkedBlockingQueue<Integer> linkedBlockingDeque = new LinkedBlockingQueue<>();
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(15);
        for(int i = 0; i < 8; i++){
            executorService.submit(new Product(linkedBlockingDeque));
        }

        for(int i = 0; i < 12; i++){
            executorService.submit(new Customer(linkedBlockingDeque));
        }
        executorService.shutdown();
    }
    static class Product implements Runnable{
        LinkedBlockingQueue<Integer> blockingQueue;
        Product(LinkedBlockingQueue<Integer> blockingQueue){
            this.blockingQueue = blockingQueue;
        }

        @Override
        public void run(a) {
            while (true) {try {
                    int idx = new Random().nextInt();
                    blockingQueue.put(idx);
                    System.out.println(Thread.currentThread().getName() + "Production Product ID :" + idx);
                }catch(Exception e){ e.printStackTrace(); }}}}static class Customer implements Runnable{
        LinkedBlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
        Customer(LinkedBlockingQueue<Integer> blockingQueue){
            this.blockingQueue = blockingQueue;
        }
        @Override
        public void run(a) {
            while (true) {try {
                    int idx = blockingQueue.take();
                    System.out.println(Thread.currentThread().getName() + "Consumer Product ID :" + idx);
                }catch(Exception e){ e.printStackTrace(); }}}}Copy the code

refrence

Operation mechanism of synchronous queue and wait queue

  1. Each Condition object contains a wait queue
  2. On the monitor model of Object, an Object has one synchronization queue and one wait queue, whereas AQS has one synchronization queue and multiple wait queues.
  • Calling the await method on condition causes the current thread to queue and release the lock (join the wait and then release the lock), and the thread state changes to wait.
  • When the condition’s signal method is called, the first node of the wait queue is moved to the end of the wait queue, and the node is woken up. Awaked does not mean that it will return from the await method, nor does it mean that the thread of the node will acquire the lock. It also needs to join the acquireQueued method of the lock, and will return from the await method only if it successfully contends with the lock.