An overview of the

  • BlockingQueue is a BlockingQueue.
  • BlockingQueue extends from Queue by adding the blocking methods put() and take(),
  • BlockingQueue is the interface that implements the corresponding blocking method by the implementation class. For example: ArrayBlockingQueue SychronousQueue, TranferQueue LinkedBlockingQueue, PriorityBlockingQueue and so on.

ArrayBlockingQueue

The bottom layer is stored in an array as a container. ReentrantLock is used for locking, and different conditions are used for different operations

Final Object[] items; /** Condition */ private final condition notFull; /** Condition */ private final condition notEmpty; Public void put(E E) throws InterruptedException {// Check whether it is not null checkNotNull(E); final ReentrantLock lock = this.lock; // Locking can be interrupted lock.lockinterruptibly (); // While (count == items.length) notfull.await (); enqueue(e); // Add to array container} finally {lock.unlock(); Public E take() throws InterruptedException {final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // Break lock try {while (count == 0) // If the data is empty, let the take condition wait notempty.await (); return dequeue(); } finally { lock.unlock(); }}Copy the code

LinkedBlockingQueue

The bottom layer is stored in a linked list, take is the head. Next,put is inserted into the end, can be read and write separation

Private Final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition(); Public void put(E E) throws InterruptedException {// If (E == null) throw new NullPointerException(); int c = -1; // New Node<E> Node = new Node<E>(E); final ReentrantLock putLock = this.putLock; // Write lock final AtomicInteger count = this.count; / / lock putLock. LockInterruptibly (); While (count.get() == capacity) {notfull.await (); } enqueue(node); C = count.getAndincrement (); If (c + 1 < capacity) // Wake up notFull notFull. Signal (); } finally { putLock.unlock(); SignalNotEmpty ();} if (c == 0) // notEmpty (); } public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; / / read lock takeLock. LockInterruptibly (); // Lock try {while (count.get() == 0) {// Takecondition waits notempty.await (); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }Copy the code

PriorityBlockingQueue

Priority cache queues, which default to a small root heap, can be customized by adding a comparator

Public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null); public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }Copy the code

By default, the size of the PUT method is 1.5 times larger than the original size. If no comparator is set, use the default one (small root heap). If a comparator is set, use a custom one

public void put(E e) { offer(e); } public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); // try {Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); Else siftUpUsingComparator(n, e, array, CMP); // size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }Copy the code

Take method

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }

  /**
     * 怎么存,怎么取,使用相同的比较器去移除数据
     */
    private E dequeue() {
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            E result = (E) array[0];
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }
Copy the code