ArrayBlockingQueue vs. LinkedBlockingQueue

1. ArrayBlockingQueue

1.1 Class members and methods

ArrayBlockingQueue is an array-based implementation. An array is maintained internally, with only one internal lock and two conditions.

   /** * array of queue elements */
    final Object[] items;
   /** ** The next location to get */
    int takeIndex;
    /** * the next position to join the queue */
    int putIndex;
   /** * The current number */
    int count;
	 / * * * * / lock
    final ReentrantLock lock;
	 /** * is used to wake up */
    private final Condition notEmpty;
    /** * is used to wake up a non-full queue */
    private final Condition notFull;
Copy the code

Offer method. This method does not block if the queue is full.

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true; }}finally{ lock.unlock(); }}Copy the code

The enqueue method

   private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        // Wake up the notempty.await () thread
        notEmpty.signal();
    }
Copy the code

The put method, which is notful.await () on queue entry if the queue is full; Blocking, waiting for a wake-up call when out of line. If the form of spin is changed here, the CPU will idle, reducing performance.

  public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally{ lock.unlock(); }}Copy the code

Poll method, which returns null without blocking if there is no data in the queue.

  public E poll(a) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0)?null : dequeue();
        } finally{ lock.unlock(); }}Copy the code

Take method This method blocks if there is no data in the queue when queuing up, waiting for notEmpty. SignAll to wake up

 public E take(a) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally{ lock.unlock(); }}Copy the code

To dequeue method

 private E dequeue(a) {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] ! = null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if(itrs ! =null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }
Copy the code

If the put method succeeds in locking, but the queue is full, then the take method cannot obtain the lock until the PUT method wakes up, and the PUT block waits for the take to wake up, resulting in a deadlock. But that’s not the case, so go into the await and signal methods of Condition.

1.2 awaitandsignalmethods

Look at the await method, which first adds the current thread to the wait queue, then releases the resource, checks isOnSyncQueue to see if the node is in the aQS wait queue, and if it enters the loop, blocks the thread and waits to be woken up. When the thread is woken up, it will check the node through isOnSyncQueue. After it breaks out of the loop, it will try to acquire the resource. If it fails to acquire the resource, it will return to the acquireQueued method in AQS and join the queue to wait for awakening.

 public final void await(a) throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // Add the current thread to the wait queue
            // This queue is created by nextWaiter, which is a one-way linked list
            Node node = addConditionWaiter();
            // Release the resource and set the resource to 0, i.e. state to 0, so that other threads can continue to acquire the lock
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // Check whether the current node is in the SyncQueue, which refers to the bidirectional queue of aQS
            while(! isOnSyncQueue(node)) {/ / blocking
                LockSupport.park(this);
                if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
                    break;
            }
            // Try to obtain the resource
            if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
                unlinkCancelledWaiters();
            if(interruptMode ! =0)
                reportInterruptAfterWait(interruptMode);
}
Copy the code

Take a look at the Signal method, which basically starts at the beginning node and iterates one by one to wake up the condition.

  public final void signal(a) {
            if(! isHeldExclusively())throw new IllegalMonitorStateException();
            // Wake up the first node
            Node first = firstWaiter;
            if(first ! =null)
                doSignal(first);
 }
  private void doSignal(Node first) {
            // Find the first node that meets the wakeup condition.
            do {
                if ((firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while(! transferForSignal(first) && (first = firstWaiter) ! =null);
  }
Copy the code

1.3 summarize

ArrayBlockingQueue is an array-based bounded queue that maintains only one lock, so the entire queue needs to be locked. The lock uses a Reentrantlock and maintains two non-empty, non-full conditions, using Condition to control awakening more freely. If there is no data in the queue, the current thread node will be added to the waiter unidirectional queue. When new data is added to the queue, the first waiter node will be awakened. The node will compete for ReentrantLock after awakening, and if it fails, it will be added to the aQS bidirectional wait queue. Waiting to reawaken competition. Note that when you perform remove, you move the entire array (worst case) and acquire a lock, which has a significant performance impact.

2. LinkedBlockingQueue

2.1 Class members and methods

LinkedBlockingQueue is a list-based implementation that maintains a single Node. Unlike ArrayBlockingQueue, LinkedBlockingQueue maintains two internal ReentrantLocks that separate out and in the queue. Two conditions are also maintained.

   /** * The capacity. If the capacity is not set, the default size is integer. MAX_VALUE */
    private final int capacity;
    /** * The number of nodes in the queue, unlike ArrayBlockingQueue, which uses the base type * because two locks are separated, the atomic type is used to increase or decrease the number of nodes in the queue. * /
    private final AtomicInteger count = new AtomicInteger();
    /** ** header */
    transient Node<E> head;
    /** * end */
    private transient Node<E> last;
    /** * queue lock */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** * non-null condition */
    private final Condition notEmpty = takeLock.newCondition();
    /** * queue lock */
    private final ReentrantLock putLock = new ReentrantLock();
    /** * is not a full condition */
    private final Condition notFull = putLock.newCondition();
Copy the code

The offer method, which returns without blocking if the current quantity has reached the specified capacity.

public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        // If the current number has reached the specified capacity, return false
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            // If the current number does not reach the capacity
            if (count.get() < capacity) {
                / / team
                enqueue(node);
                / / the number of + 1
                c = count.getAndIncrement();
                // If the current number is less than the capacity, the enqueued thread is woken up
                if (c + 1< capacity) notFull.signal(); }}finally {
            putLock.unlock();
        }
        if (c == 0)
            // Wake up the queue operation
            signalNotEmpty();
        return c >= 0;
    }
Copy the code

The put method, which is basically the same as the offer method, except that the queue is full and blocks waiting.

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            // Queue is full, blocking waiting
            while (count.get() == capacity) {
                notFull.await();
            }
            / / team
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
Copy the code

The enqueue method

   private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }
Copy the code

Poll method, which returns without blocking if there are no elements in the queue.

    public E poll(a) {
        final AtomicInteger count = this.count;
        // No element returns null directly
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            // Check the current number of elements again
            if (count.get() > 0) {
                / / out of the team
                x = dequeue();
                // Current quantity minus 1
                c = count.getAndDecrement();
                // If the current number is greater than 1, wake up the queue thread
                if (c > 1) notEmpty.signal(); }}finally {
            takeLock.unlock();
        }
        // c is the number of queues before the queue exits. If it is equal to the capacity, the queue is not full
        if (c == capacity)
            signalNotFull();
        return x;
    }
Copy the code

Take method, which is basically the same as poll except that when the queue is empty, the wait is blocked.

 public E take(a) throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // If the queue is empty, the wait is blocked
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
Copy the code

To dequeue method

   private E dequeue(a) {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
Copy the code

2.2 summarize

LinkedBlockingQueue is an implementation based on a linked list. It is a one-way linked list. Unlike ArrayBlockingQueue, it maintains two reentrantLocks, one for controlling queue exit and queue entry, which improves concurrency performance. Two non-empty and non-full conditions are also maintained, using the Condition to control awakening more freely. LinkedBlockingQueue is a bounded queue that defaults to integer. MAX_VALUE if the queue size is not specified.

3. Differences between the two

ArrayBlockingQueue is implemented based on arrays. LinkedBlockingQueue is implemented based on a single linked list.

ArrayBlockingQueue Uses the same lock for all queue entries. LinkedBlockingQueue allows each lock to join a queue without interfering with each other, improving concurrency.

ArrayBlockingQueue must specify the capacity. LinkedBlockingQueue can specify a size, otherwise it defaults to integer.max_value, which can cause memory overflow problems.

ArrayBlockingQueue Performs remove to remove the element; LinkedBlockingQueue perform remove to find the target node and delete it.

This article is published by OpenWrite, a blogging tool platform