This article is participating in the “Java Theme Month – Java Brush questions punch card”, see the activity link for details
Interviewer: Ok, ArrayBlockingQueue, let’s move on to LinkedBlockingQueue
Hydra: You can’t take a break. LinkedBlockingQueue is a LinkedBlockingQueue based on a linked list. Each queued element is encapsulated into a Node with a pointer to the next element:
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; }}Copy the code
The key attributes in LinkedBlockingQueue are as follows:
private final int capacity;// Queue capacity
private final AtomicInteger count = new AtomicInteger();// The number of elements in the queue
transient Node<E> head;/ / head node
private transient Node<E> last;/ / end nodes
/ / lock out team
private final ReentrantLock takeLock = new ReentrantLock();
// Queue waiting condition object
private final Condition notEmpty = takeLock.newCondition();
/ / enqueue lock
private final ReentrantLock putLock = new ReentrantLock();
// Queue waiting condition object
private final Condition notFull = putLock.newCondition();
Copy the code
There are two types of constructors: specified queue length and unspecified queue length. If not specified, the maximum queue length is the maximum value of int. Of course, if you store so many elements, you might run out of memory:
public LinkedBlockingQueue(a) {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
Copy the code
There is another constructor that passes collections as arguments at initialization, and the implementation is pretty straightforward, but it loops through the enqueue enqueueing method that we’ll see later, which we’ll skip here.
In LinkedBlockingQueue, the head node of the queue has no element, its item is null, and next points to the element that is really the first element. It also has two Condition objects that block waiting. Unlike the previous ArrayBlockingQueue, different locks takeLock and putLock are used for queue exit and queue entry. The queue structure looks like this:
Interviewer: Why use two locks? Wouldn’t ArrayBlockingQueue use one lock to keep threads safe?
Hydra: The use of two locks ensures that element inserts and deletions are not mutually exclusive and can be performed simultaneously for improved throughput
Interviewer: Well, as usual, how does the insertion work first
Hydra: AbstractQueue (add) Hydra: AbstractQueue (add) Hydra: AbstractQueue (add) Hydra: AbstractQueue (add)
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;// Number of elements in the queue
if (count.get() == capacity)/ / is full
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// Check whether the queue is full again
if (count.get() < capacity) {
enqueue(node);
// Note that this is the length of the column before the element is added
c = count.getAndIncrement();
if (c + 1 < capacity)/ / not fullnotFull.signal(); }}finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
Copy the code
In the offer method, first judge whether the queue is full. If it is not, encapsulate the element as a Node object and try to obtain the insert lock. After obtaining the lock, it will judge whether the queue is full again and release the lock directly. Enqueue is called when a lock is held and the queue is full.
The enqueue method is also very simple to implement. Next points to the new node and last points to the new node:
private void enqueue(Node<E> node) {
last = last.next = node;
}
Copy the code
Draw a picture to help you understand:
After enqueueing, it determines whether the queue is full, and if not, it calls notfuller.signal () to wake up the thread waiting to insert the element into the queue.
Interviewer: NOtempty.signal () is called after inserting elements in ArrayBlockingQueue.
Hydra: At this point, it’s important to mention the benefits of having two locks to control the insertion and retrieval of elements, respectively. In ArrayBlockingQueue, the same lock is used to control both enqueueing and enqueueing, so if you wake up the insertion thread after the element is inserted, it is likely that the thread waiting to acquire the element will never wake up, causing it to wait too long.
In LinkedBlockingQueue, putLock and takeLock are used respectively. Insert threads and fetch threads are not mutually exclusive. Therefore, the insertion thread can continuously wake up other insertion threads without worrying about whether the fetch thread will wait too long, thus improving throughput to a certain extent. Of course, since the Offer method is non-blocking and does not suspend the blocking thread, we wake up the thread blocking the inserted PUT method.
Interviewer: Moving on, why is it necessary to wake up the thread waiting to fetch elements in notEmpty with c equal to 0?
Hydra: The method for retrieving an element is the same as the method for inserting an element. As long as there is a fetch thread executing the method, other fetch threads are constantly woken up by notempty.signal (). It is only when c is equal to 0 that there are no elements in the queue, and then the fetch thread may block and need to be woken up. The above can be illustrated with a picture:
As we have said before, the head node in the queue can be regarded as the symbolic node that does not store data, so it can be simply considered as taking out the second node directly when queuing up. Of course, this process is not very rigorous, and I will explain it later in the queuing process.
Interviewer: So what’s the difference between the blocking method put and it?
Hydra: The put and offer methods are the same, but the lock method is interruptable, and the thread is added to the notFull queue when the queue is full.
while (count.get() == capacity) {
notFull.await();
}
Copy the code
This process is shown on the elements of the notFull wait queue in the figure above and will not be repeated. The offer method can block for a finite period of time. When the insert element expires, the insert element will be discarded. Let’s look at the different parts of the code:
public boolean offer(E e, long timeout, TimeUnit unit){...long nanos = unit.toNanos(timeout);// Convert to nanoseconds.while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(newNode<E>(e)); . }Copy the code
The awaitNanos method adds an out-of-timeout mechanism to the await method, which evaluates in a loop whether a preset timeout has been reached. If you are woken up before the timeout period is reached, the timeout period is returned minus the time consumed. If the method returns a value less than or equal to zero, it is considered to have timed out, either after being woken up by another thread or after a specified timeout.
Interviewer: It takes so much effort to explain the insertion. I’ll take a drink of water first, and you go on to talk about the method of obtaining elements
Hydra:… So let’s look at the non-blocking poll method
public E poll(a) {
final AtomicInteger count = this.count;
if (count.get() == 0)// The queue is empty
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {// The queue is not empty
x = dequeue();
// Long queue before leaving the queue
c = count.getAndDecrement();
if (c > 1) notEmpty.signal(); }}finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
Copy the code
Dequeue is executed when the queue is not empty. If the queue is still not empty, the thread waiting for the element is woken up. And when the number of elements before the queue is equal to the queue length, the adding thread on the waiting queue is woken up after the queue.
Queue = dequeue ();
private E dequeue(a) {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
Copy the code
As mentioned earlier, the head node does not store data, and its next node is really the first node. In the queue operation, the first node of the head node is first, and the next pointer of the current head node is pointed to itself. There is a simple comment in the code called help GC, which is to reduce the reference count in the GC, so that it can be collected earlier. Then point the new head node to first and return the content before it was cleared to NULL. Using the graph, it looks like this:
Interviewer :(looking at his watch) the overall logic of the take method is similar. Can you summarize it briefly
The Hydra method is a block fetch method that can be interrupted. When the queue is empty, the current thread is suspended and added to the conditional notEmpty waiting queue for another thread to wake up.
Interviewer: I’ll give you one more sentence to summarize its similarities and differences with ArrayBlockingQueue, and I’m going home from work
Hydra: Well, to sum up, I have the following points:
- Different queue lengths,
ArrayBlockingQueue
The length must be specified and cannot be modifiedLinkedBlockingQueue
The length may or may not be specified - Storage is different,
ArrayBlockingQueue
Using an array, whileLinkedBlockingQueue
useNode
A linked list of nodes ArrayBlockingQueue
Use a lock to control the insertion and removal of elements, whileLinkedBlockingQueue
The enqueue lock is separated from the enqueue lock to improve the concurrency performanceArrayBlockingQueue
Arrays are used to store elements, so no additional objects need to be generated during insertion and removal,LinkedBlockingQueue
Will generate a newNode
Node,gc
There will be affected
Interviewer: Tomorrow at 9:00 a.m., same place. We’ll talk about something else
Hydra:…