The paper
ArrayBlockingQueue is implemented as an array, so LinkedBlockingQueue is implemented as a linked list. Take a look at the UML for LinkedBlockingQueue, as follows:
As you can see from UML, it is also the implementation of BlockingQueue. Its core is Blocking. ArrayBlockingQueue uses an exclusive lock. Requiring both operations to acquire an exclusive lock on the leading queue makes it impossible for the take() and put() operations to be truly concurrent. They wait for each other to release resources. In this case, the competition is more exciting, which affects the efficiency of high concurrency. To solve this problem, LinkedBlockingQueue uses ** lock separation **. The take() and put() functions are used to retrieve data from and add data to a queue, respectively. In other words, the take() method is controlled by a special lock, and the put() method is controlled by a special lock. Because LinkedBlockingQueue is implemented as a linked list, both operations are unaffected.
The source code parsing
First take a look at the field information in the LinkedBlockingQueue:
/** * List of nodes */
static class Node<E> {
E item;
/** * next node, If the Node is Null, the last Node * - the real succeeded Node * - this Node, meaning the precursor is head. Next * - Null, meaning there is no successor (this is the last node) */Node<E> next; Node(E x) { item = x; }}/** Capacity limit, integer.max_value */ if not specified
private final int capacity;
/** The number of elements in the current queue, atomic operation */
private final AtomicInteger count = new AtomicInteger();
/** * header * Invariant: head. Item == null */
transient Node<E> head;
/** * random: random. Next == null */
private transient Node<E> last;
/**take, poll reentrant lock */
private final ReentrantLock takeLock = new ReentrantLock();
/** not null condition */
private final Condition notEmpty = takeLock.newCondition();
/** put, reentrant lock of offer */
private final ReentrantLock putLock = new ReentrantLock();
/** team full condition */
private final Condition notFull = putLock.newCondition();
Copy the code
- Node A Node maintains linked list information.
- The maximum capacity limit can be specified by the user. If not specified, it represents the maximum value of Integer.
- There are head nodes and tail nodes.
- TakeLock stands for locks for take, poll, and other out-of-queue operations.
- PutLock indicates the lock of put, offer, and other queued operations.
Let’s look at how the put method works:
/** * inserts the specified element at the end of this queue, waiting for queue space to become available. * *@throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc} * /
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// Keep the count negative, indicating failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
/ / putLock lock.
final ReentrantLock putLock = this.putLock;
// List length, atomic operation.
final AtomicInteger count = this.count;
// Get the lock and respond to the interrupt. The put operation has only one thread operation.
putLock.lockInterruptibly();
try {
// If the list length is waiting for capacity, the queue is full, and the queue is empty.
while (count.get() == capacity) {
notFull.await();
}
// Insert the element at the end of the queue.
enqueue(node);
// c is the value before count + 1, this is atomic operation, it will do CAS, because now there are two threads doing it, it is possible to put and take, so keep atomicity.
c = count.getAndIncrement();
// if c+1 is not the maximum, notify notFull. If the queue is notFull, notify other threads.
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// c represents the value before the insertion, so c=0 when the queue is empty, the data has already been inserted, so c should not be 0, so we need to inform the queue that the element was inserted successfully.
if (c == 0)
signalNotEmpty();
}
Copy the code
If the length of the current queue is equal to the maximum length of the queue, it indicates that the queue is not full. When the queue is empty, data can be inserted directly to the end of the queue. Since the AtomicInteger method getAndIncrement returns the value before the operation, for example, if the value of 5 is increased by 1, it will return 5 instead of returning 6, and c will store the value before the count element is increased by 1, which means that c will be 0 when the column is empty. The actual count is 1, but because we’re storing the value before increment, all c=0 means there’s something in the queue telling notEmpty to take.
The enqueue method is simple: insert node to the end of the queue and last to the end of the queue.
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
Copy the code
Here’s how the take method source code is implemented, as follows:
/** * get the element from the queue header and wait for the queue to have data to read. * /
public E take(a) throws InterruptedException {
E x;
// Save variables locally.
int c = -1;
// Queue length.
final AtomicInteger count = this.count;
// Get the take reentrant lock.
final ReentrantLock takeLock = this.takeLock;
// Acquire the lock and respond to the interrupt operation, and only one thread enters the take method.
takeLock.lockInterruptibly();
try {
// If the queue is empty, wait until the queue is not empty.
while (count.get() == 0) {
notEmpty.await();
}
// Queue out operation.
x = dequeue();
// c saves the value before minus 1.
c = count.getAndDecrement();
// If there are still elements in the queue, other threads can be told to take.
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// c If the value is capacity, it indicates that the queue is full. After the take method is performed, it indicates that the queue has enough space for putting and notifies notFull to put.
if (c == capacity)
signalNotFull();
return x;
}
Copy the code
As can be seen from the above source code, the take method acquires the takeLock reentrant lock, and after the current thread enters the take method, other threads are not allowed to enter the take method at the same time. First, determine whether the queue length is 0. If the queue length is 0, it means that there is no data in the queue to consume, and wait. If the length of the queue is not 0, the dequeue method will be used to exit the queue. The head node will be pointed to the next node and the current head value will be returned. When c is greater than 1, there are still elements that can be taken and other threads will be notified to take. C If the value is Capacity, it indicates that the queue is full before. After the take method is performed, space is available in the queue. Therefore, notFull can be notified to perform the PUT operation.
private E dequeue(a) {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // Help the GC with garbage collection.
head = first;
E x = first.item;
first.item = null;
return x;
}
Copy the code
conclusion
- LinkedBlockingQueue is controlled by lock separation, reducing lock contention between take and put.
- LinkedBlockingQueue is implemented as a linked list, so lock separation does not conflict because joining and leaving work at the end and head of the queue, respectively.
- Atomic operation class (CAS) is used to control the length of the linked list.
- If the queue is empty before joining the queue, the system notifies the TAKE method that there is more data in the queue to take. Otherwise, if the queue is full before joining the queue, the system notifies the PUT method that there is more space in the queue to put.
If you like, you can follow my wechat public account and push articles from time to time