The best way to learn is to write

Welcome to my personal den:wsfss.top

So today we’re going to do a circular queue, multi-producer, single-consumer model, and simulate what happens when the queue is full

Here is the realization principle of the circular queue

  • The length of the queue is fixed, and the queue has two Pointers: front and rear
  • The tail pointer moves forward one bit each time an element is inserted
  • For each element fetched, the head pointer moves forward one bit
  • When the queue is full, one element is removed, another element is added, and the tail pointer returns to zero
  • When the number of elements reaches the maximum length of the queue, the head pointer returns to zero

1, the following code practice, first to an interface class

package com.fss.util.queue; Public interface Queue<T> {/** * add element */ Boolean push(T T); /** * pop(); /** * Bool isFull(); /** * check whether the queue isEmpty */ Boolean isEmpty(); }Copy the code

Make sure that subclasses implement the interface themselves

package com.fss.util.queue; public abstract class AbstractQueue<T> implements Queue<T> { @Override public boolean push(T t) { throw new UnsupportedOperationException (" does not support method "); Pop () {} @ Override public T throw new UnsupportedOperationException (" does not support method "); {} @ Override public Boolean isFull () throw new UnsupportedOperationException (" does not support method "); {} @ Override public Boolean isEmpty () throw new UnsupportedOperationException (" does not support method "); }}Copy the code

3. Queue implementation classes

package com.fss.util.queue; import java.util.Arrays; Public class CycleQueue<T> extends AbstractQueue<T> implements Queue<T>{private static final int CAPCAITY = 6; // Headers volatile int front; // Queue tail volatile int rear; // Volatile T[] arrays; public CycleQueue() { this.arrays = (T[]) new Object[CAPCAITY]; } @override public Boolean push(T T) {synchronized (this) {if (! isFull()) { arrays[rear] = t; If (rear == CAPCAITY) {rear = 0; if (rear == CAPCAITY) {rear = 0; } return true; } return false; Override public T pop() {synchronized (this) {if (! IsEmpty ()) {system.out.print (" current array: "); for (int i=0; i<arrays.length; i++) { System.out.print(arrays[i] + ","); } System.out.print(" - "); final T cur = arrays[front]; arrays[front] = null; front++; If (front == CAPCAITY) {front = 0; if (front == CAPCAITY) {front = 0; } return cur; } return null; }} /* Override public Boolean isFull() {synchronized (this) {return front == rear && Arrays [rear]! = null; }} /** public Boolean isEmpty() {synchronized (this) {return front == rear && Arrays [rear] ==  null; }}}Copy the code

4, finished, now from 3 threads, 2 producers, 1 consumer, production speed is greater than consumption speed

  • Producer (waits when the queue is full and continues to produce when an element is fetched)
package com.fss.util.thread; import com.fss.util.queue.CycleQueue; import java.util.Random; public class PushQueueThread extends Thread{ private CycleQueue queue; private String threadName; public PushQueueThread(CycleQueue queue, String threadName) { this.queue = queue; this.threadName = threadName; } @Override public void run() { synchronized (queue) { while (true) { int i = new Random().nextInt(1000); While (queue.isFull()) {try {system.out. print(threadName + ") \n"); queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.push(i); System.out.print(threadName + "- insert elements: "+ I + "\n"); try { queue.wait(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } }Copy the code
  • Consumer (notifies producer to produce after fetching element)
package com.fss.util.thread; import com.fss.util.queue.CycleQueue; import java.util.List; public class PopQueueThread extends Thread{ private CycleQueue queue; private String threadName; public PopQueueThread(CycleQueue queue, String threadName) { this.queue = queue; this.threadName = threadName; } @Override public void run() { synchronized (queue) { while (true) { if (! queue.isEmpty()) { Object o = queue.pop(); System.out.print(threadName + "- fetch element: "+o + "\n"); try { queue.wait(100); System.out.println(threadName, threadName, threadName, threadName) ); queue.notifyAll(); } catch (InterruptedException e) { e.printStackTrace(); }} else {try {system.out. print(threadName + "); \n"); queue.wait(200); queue.notifyAll(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }Copy the code

5. Start with a main method

package test; import com.fss.util.queue.CycleQueue; import com.fss.util.thread.PopQueueThread; import com.fss.util.thread.PushQueueThread; public class CycleQueueTest { public static void main(String[] args) { CycleQueue queue = new CycleQueue(); PushQueueThread pushThread1 = new PushQueueThread(queue, "Push1"); PushQueueThread pushThread2 = new PushQueueThread(queue, "Push2"); PopQueueThread popThread = new PopQueueThread(queue, "Pop1"); pushThread1.start(); pushThread2.start(); popThread.start(); }}Copy the code

The output

Push1 - add elements: 148 Push2 - add elements: current array: 694, 148694, null, null, null, null, Pop1 - remove elements: 148 Push1- Insert elements: 797 Push2- insert elements: 781 Pop1 - Empty space, next up... The current array: null, 694797781, null, null, Pop1 - remove elements: Push2- Put element: 390 Push2- Put element: 633 Pop1 - Empty space, next up... Current array: 633, null, 797781639390, - Pop1 - remove elements: 797 Push1 - add elements: 843 Push2 - add elements: 945 Pop1 - have the vacancy, next up... Current array: 633,843,945,781,639,390, -pop1 - fetch element: 781 Push2- put element: 33 Push1 - find queue full, queue alleles in... Pop1 - There's a seat, next one up...Copy the code