ArrayBlockingQueue vs. LinkedBlockingQueue
1. ArrayBlockingQueue
1.1 Class members and methods
ArrayBlockingQueue is an array-based implementation. An array is maintained internally, with only one internal lock and two conditions.
/** * array of queue elements */
final Object[] items;
/** ** The next location to get */
int takeIndex;
/** * the next position to join the queue */
int putIndex;
/** * The current number */
int count;
/ * * * * / lock
final ReentrantLock lock;
/** * is used to wake up */
private final Condition notEmpty;
/** * is used to wake up a non-full queue */
private final Condition notFull;
Copy the code
Offer method. This method does not block if the queue is full.
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true; }}finally{ lock.unlock(); }}Copy the code
The enqueue method
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// Wake up the notempty.await () thread
notEmpty.signal();
}
Copy the code
The put method, which is notful.await () on queue entry if the queue is full; Blocking, waiting for a wake-up call when out of line. If the form of spin is changed here, the CPU will idle, reducing performance.
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally{ lock.unlock(); }}Copy the code
Poll method, which returns null without blocking if there is no data in the queue.
public E poll(a) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0)?null : dequeue();
} finally{ lock.unlock(); }}Copy the code
Take method This method blocks if there is no data in the queue when queuing up, waiting for notEmpty. SignAll to wake up
public E take(a) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally{ lock.unlock(); }}Copy the code
To dequeue method
private E dequeue(a) {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] ! = null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if(itrs ! =null)
itrs.elementDequeued();
notFull.signal();
return x;
}
Copy the code
If the put method succeeds in locking, but the queue is full, then the take method cannot obtain the lock until the PUT method wakes up, and the PUT block waits for the take to wake up, resulting in a deadlock. But that’s not the case, so go into the await and signal methods of Condition.
1.2 await
andsignal
methods
Look at the await method, which first adds the current thread to the wait queue, then releases the resource, checks isOnSyncQueue to see if the node is in the aQS wait queue, and if it enters the loop, blocks the thread and waits to be woken up. When the thread is woken up, it will check the node through isOnSyncQueue. After it breaks out of the loop, it will try to acquire the resource. If it fails to acquire the resource, it will return to the acquireQueued method in AQS and join the queue to wait for awakening.
public final void await(a) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// Add the current thread to the wait queue
// This queue is created by nextWaiter, which is a one-way linked list
Node node = addConditionWaiter();
// Release the resource and set the resource to 0, i.e. state to 0, so that other threads can continue to acquire the lock
int savedState = fullyRelease(node);
int interruptMode = 0;
// Check whether the current node is in the SyncQueue, which refers to the bidirectional queue of aQS
while(! isOnSyncQueue(node)) {/ / blocking
LockSupport.park(this);
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
break;
}
// Try to obtain the resource
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
unlinkCancelledWaiters();
if(interruptMode ! =0)
reportInterruptAfterWait(interruptMode);
}
Copy the code
Take a look at the Signal method, which basically starts at the beginning node and iterates one by one to wake up the condition.
public final void signal(a) {
if(! isHeldExclusively())throw new IllegalMonitorStateException();
// Wake up the first node
Node first = firstWaiter;
if(first ! =null)
doSignal(first);
}
private void doSignal(Node first) {
// Find the first node that meets the wakeup condition.
do {
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while(! transferForSignal(first) && (first = firstWaiter) ! =null);
}
Copy the code
1.3 summarize
ArrayBlockingQueue is an array-based bounded queue that maintains only one lock, so the entire queue needs to be locked. The lock uses a Reentrantlock and maintains two non-empty, non-full conditions, using Condition to control awakening more freely. If there is no data in the queue, the current thread node will be added to the waiter unidirectional queue. When new data is added to the queue, the first waiter node will be awakened. The node will compete for ReentrantLock after awakening, and if it fails, it will be added to the aQS bidirectional wait queue. Waiting to reawaken competition. Note that when you perform remove, you move the entire array (worst case) and acquire a lock, which has a significant performance impact.
2. LinkedBlockingQueue
2.1 Class members and methods
LinkedBlockingQueue is a list-based implementation that maintains a single Node. Unlike ArrayBlockingQueue, LinkedBlockingQueue maintains two internal ReentrantLocks that separate out and in the queue. Two conditions are also maintained.
/** * The capacity. If the capacity is not set, the default size is integer. MAX_VALUE */
private final int capacity;
/** * The number of nodes in the queue, unlike ArrayBlockingQueue, which uses the base type * because two locks are separated, the atomic type is used to increase or decrease the number of nodes in the queue. * /
private final AtomicInteger count = new AtomicInteger();
/** ** header */
transient Node<E> head;
/** * end */
private transient Node<E> last;
/** * queue lock */
private final ReentrantLock takeLock = new ReentrantLock();
/** * non-null condition */
private final Condition notEmpty = takeLock.newCondition();
/** * queue lock */
private final ReentrantLock putLock = new ReentrantLock();
/** * is not a full condition */
private final Condition notFull = putLock.newCondition();
Copy the code
The offer method, which returns without blocking if the current quantity has reached the specified capacity.
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// If the current number has reached the specified capacity, return false
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// If the current number does not reach the capacity
if (count.get() < capacity) {
/ / team
enqueue(node);
/ / the number of + 1
c = count.getAndIncrement();
// If the current number is less than the capacity, the enqueued thread is woken up
if (c + 1< capacity) notFull.signal(); }}finally {
putLock.unlock();
}
if (c == 0)
// Wake up the queue operation
signalNotEmpty();
return c >= 0;
}
Copy the code
The put method, which is basically the same as the offer method, except that the queue is full and blocks waiting.
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// Queue is full, blocking waiting
while (count.get() == capacity) {
notFull.await();
}
/ / team
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
Copy the code
The enqueue method
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
Copy the code
Poll method, which returns without blocking if there are no elements in the queue.
public E poll(a) {
final AtomicInteger count = this.count;
// No element returns null directly
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// Check the current number of elements again
if (count.get() > 0) {
/ / out of the team
x = dequeue();
// Current quantity minus 1
c = count.getAndDecrement();
// If the current number is greater than 1, wake up the queue thread
if (c > 1) notEmpty.signal(); }}finally {
takeLock.unlock();
}
// c is the number of queues before the queue exits. If it is equal to the capacity, the queue is not full
if (c == capacity)
signalNotFull();
return x;
}
Copy the code
Take method, which is basically the same as poll except that when the queue is empty, the wait is blocked.
public E take(a) throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// If the queue is empty, the wait is blocked
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
Copy the code
To dequeue method
private E dequeue(a) {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
Copy the code
2.2 summarize
LinkedBlockingQueue is an implementation based on a linked list. It is a one-way linked list. Unlike ArrayBlockingQueue, it maintains two reentrantLocks, one for controlling queue exit and queue entry, which improves concurrency performance. Two non-empty and non-full conditions are also maintained, using the Condition to control awakening more freely. LinkedBlockingQueue is a bounded queue that defaults to integer. MAX_VALUE if the queue size is not specified.
3. Differences between the two
ArrayBlockingQueue is implemented based on arrays. LinkedBlockingQueue is implemented based on a single linked list.
ArrayBlockingQueue Uses the same lock for all queue entries. LinkedBlockingQueue allows each lock to join a queue without interfering with each other, improving concurrency.
ArrayBlockingQueue must specify the capacity. LinkedBlockingQueue can specify a size, otherwise it defaults to integer.max_value, which can cause memory overflow problems.
ArrayBlockingQueue Performs remove to remove the element; LinkedBlockingQueue perform remove to find the target node and delete it.
This article is published by OpenWrite, a blogging tool platform