In our last blog post, we introduced ArrayBlockQueue, which is a bounded blocking queue based on an array implementation. If there is an array-based implementation, there must be a list based queue. Yes, there is, and that’s our main topic today: LinkedBlockingQueue. ArrayBlockQueue is bounded. Is LinkedBlockingQueue bounded or unbounded? I think it can be bounded or unbounded. Why do you say that? You’ll see.
As in the last blog post, we’ll take a look at the basic application of LinkedBlockingQueue and then parse the core code of LinkedBlockingQueue.
LinkedBlockingQueue basic application
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue();
linkedBlockingQueue.add(15);
linkedBlockingQueue.add(60);
linkedBlockingQueue.offer(50);
linkedBlockingQueue.put(100);
System.out.println(linkedBlockingQueue);
System.out.println(linkedBlockingQueue.size());
System.out.println(linkedBlockingQueue.take());
System.out.println(linkedBlockingQueue);
System.out.println(linkedBlockingQueue.poll());
System.out.println(linkedBlockingQueue);
System.out.println(linkedBlockingQueue.peek());
System.out.println(linkedBlockingQueue);
System.out.println(linkedBlockingQueue.remove(50));
System.out.println(linkedBlockingQueue);
}
Copy the code
Running results:
[15, 60, 50, 100] 4 15 [60, 50, 100] 60 [50, 100] 50true
[100]
Copy the code
The code is relatively simple, first try to analyze:
- A LinkedBlockingQueue was created.
- Add elements to the LinkedBlockingQueue using the add/ Offer/Put methods, respectively, with the Add method executed twice.
- Print LinkedBlockingQueue: [15, 60, 50, 100].
- Print the size of LinkedBlockingQueue: 4.
- Pop up the first element using the take method and print it out: 15.
- Print LinkedBlockingQueue: [60, 50, 100].
- Use the poll method to pop up the first element and print it out: 60.
- Print LinkedBlockingQueue: [50, 100].
- Pop up the first element using the peek method and print it out: 50.
- Print LinkedBlockingQueue: [50, 100].
- Remove the element with value 50 using the remove method, returning true.
- Print LinkedBlockingQueue: 100.
The code is relatively simple, but there are still some details I don’t understand:
- What is the underlying guarantee of thread-safety?
- Where and in what form is the data kept?
- Offer /add/put all add elements to the queue. What’s the difference?
- Poll /take/peek are all elements of the pop-up queue, what is the difference?
The best way to solve this problem is to look at the source code, and here is the core source code for LinkedBlockingQueue.
LinkedBlockingQueue source code parsing
A constructor
LinkedBlockingQueue provides three constructors, as shown in the figure below:
LinkedBlockingQueue()
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
Copy the code
The no-argument constructor actually throws the pot out to another constructor, but be careful about the argument passed: integer.max_value.
LinkedBlockingQueue(int capacity)
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
Copy the code
- Check whether the capacity input is valid. If the value is not greater than 0, an exception is thrown.
- Assign the passed capacity to Capacity.
- Create a new Node and assign this Node to the head and last fields.
What is this capacity? If you have a feel for the code, you can easily guess that this is the maximum size of LinkedBlockingQueue. If we create LinkedBlockingQueue by calling the constructor with no arguments, its maximum size is integer.max_value, which we call “unbounded”, but we can also specify the maximum size, and the queue is a “bounded” queue. So some bloggers are too hasty to say that LinkedBlockingQueue is either a bounded queue or an unbounded queue, which I personally think is not serious.
Let’s see what this Node is:
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; }}Copy the code
There’s something strangely familiar about it. It’s obvious that this is the implementation of a one-way linked list, and next points to the next Node.
LinkedBlockingQueue(Collection<? extends E> c)
public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); // Call the second constructor, passing capacity as the maximum value of Int, so to say, an unbounded queue. final ReentrantLock putLock = this.putLock; putLock.lock(); // open exclusive lock try {int n = 0; // The size used to log the LinkedBlockingQueue // the c collection passed in for loopfor (E e : c) {
if(e ==null)// Throw new NullPointerException() if e==null;if(n ==capacity)// If n==capacity, the maximum capacity is reached, throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count. Set (n); // Set count} finally {putlock. unlock(); // Release exclusive lock}}Copy the code
- Calling the second constructor passes in the maximum value of ints, so we can say that LinkedBlockingQueue is unbounded at this point.
- Enable the exclusive lock putLock.
- Defines a variable n that records the size of the current LinkedBlockingQueue.
- If n==capacity, the maximum capacity is reached, then a “Queue full” exception is raised. Otherwise, enqueue is executed to join the Queue, and n increments.
- Set count to n so that count is the size of the LinkedBlockingQueue.
- Release the exclusive lock putLock in finally.
offer
public boolean offer(E e) {
if(e == null) throw new NullPointerException(); Final AtomicInteger count = this.count; final AtomicInteger count = this.count; / / remove the countif(count.get() ==capacity)// If count==capacity, the maximum capacity is reachedfalse
return false; int c = -1; // size Node<E> Node = new Node<E>(E); Final ReentrantLock putLock = this.putLock; putLock.lock(); // open exclusive lock try {if(count.get() <capacity) {// If count<capacity, the maximum capacity is not reached enqueue(node); C = count.getandincrement (); // Get count and assign it to c to incrementif(c +1 <capacity)// If c+1 <capacity, there is room left, wake up the thread notfull.signal () that is blocked because of calling the notFull await method; } } finally { putLock.unlock(); // Finally release exclusive lock}ifSignalNotEmpty (); signalNotEmpty(); (c ==0);return c >= 0;
}
Copy the code
- If the element passed in is null, an exception is thrown.
- Assign count of an instance of this class to the local variable count.
- If count==capacity, the maximum capacity is reached, return false.
- Define the local variable c to represent size, starting with -1.
- Create a Node Node.
- Enable the exclusive lock putLock.
- If count>=capacity, return false because c=-1 and c>=0; If count is less than capacity, free space is available. So one of the things to think about here is, why do I have to do it again when I’ve already done it in step 3? Since there may be multiple threads executing add/offer/ PUT, when the queue is not full, multiple threads simultaneously execute the third step (the third step is not yet exclusive lock), and then go down at the same time, so after the exclusive lock is enabled, we need to re-evaluate.
- Perform the join operation.
- Get count, assign it to c, and increment. Note that the value is assigned before the increment, and the order of assignment and increment will directly affect the subsequent judgment logic.
- If c+1
- In finally, release the exclusive lock putLock.
- If c==0, there is one and only one element in the queue when the putLock exclusive lock is released, the signalNotEmpty method is called. Let’s look at the signalNotEmpty method:
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); }}Copy the code
The code is relatively simple: open the exclusive lock and wake up the thread that is blocked from calling the await method of notEmpty, but note that the exclusive lock is no longer a putLock but a takeLock.
add
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
Copy the code
The add method calls the Offer method directly, but add is not quite the same as offer. When the Queue is full, the offer method will return false, while the add method will raise a Queue full exception.
put
public void put(E e) throws InterruptedException {
if(e == null) throw new NullPointerException(); // If the element passed is NULL, throw an exception int c = -1; // size Node<E> Node = new Node<E>(E); Final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; / / get the count putLock. LockInterruptibly (); // If it reaches its maximum capacity, call the await method of notFull and wait to wake upwhile(count.get() == capacity) { notFull.await(); } enqueue(node); C = count.increment (); //count is assigned to c and then incrementedif(c +1<capacity)// If c+1<capacity, call notFull's signal method to wake up the thread notFull.signal() that is blocked because of calling notFull's await method; } finally { putLock.unlock(); // Release exclusive lock}if(c == 0)// If there is an element in the queue, wake up the thread signalNotEmpty() that is blocked because of calling the await method of notEmpty; }Copy the code
- If the element passed in is NULL, an exception is thrown.
- Define a local variable c to represent size, starting with -1.
- Create a Node Node.
- Assign count from an instance of this class to the local variable count.
- Enable the exclusive lock putLock.
- If it reaches its full capacity, it calls the await method of notFull, blocks the current thread and waits for another thread to call notFull’s signal method to wake it up.
- Perform the join operation.
- Count is assigned to c and then incremented.
- If c+1
- Release the exclusive lock putLock.
- If there is one and only one element in the queue, wake up the thread that is blocked because of calling the await method of notEmpty.
enqueue
private void enqueue(Node<E> node) {
last = last.next = node;
}
Copy the code
The next field of the last Node is assigned to the next field of the last Node, and the next field of the last Node is assigned to the last field.
A small summary
Offer /add/put are all methods for adding elements, but there are some differences between them. When a queue is full, the three methods are called differently:
- Offer: Returns false.
- Add: Although the offer method was called internally, the queue was full and an exception was thrown.
- Put: A thread blocks, waiting to wake up.
size
public int size() {
return count.get();
}
Copy the code
There’s nothing to say. Count counts the size of the LinkedBlockingQueue and returns it.
take
public E take() throws InterruptedException { E x; int c = -1; //size final AtomicInteger count = this.count; // Get count final ReentrantLock takeLock = this.takelock; takeLock.lockInterruptibly(); // open exclusive lock try {while(count.get() == 0) {// there is no data in the queue notempty.await (); } x = dequeue(); // Retirement c = count.getAnddecrement (); // assign first, subtract laterifNotempty.signal (); notempty.signal (); notempty.signal (); // Wake up the thread blocked by calling the await method of notEmpty} finally {takelock.unlock (); // Release exclusive lock}ifSignalNotFull (); (c == capacity) signalNotFull();return x;
}
Copy the code
- Define the local variable c to represent size, starting with -1.
- Assign the count field of this class instance to the temporary variable count.
- Enable takeLock, the exclusive lock that responds to interrupts.
- If count==0, there is no data in the queue, and the current thread is blocked until another thread calls notEmpty’s signal method to wake up the current thread.
- Perform queue exit operation.
- Count is assigned to c first, and then it is subtracted.
- If c>1 (size>1) and size>1 (size>1), we can see that there are at least two elements in the queue before queuing, then call notEmpty’s signal method to wake up the thread blocked because of calling notEmpty’s await method.
- Release exclusive lock takeLock.
- The signalNotFull method is called if there is one and only one free space in the queue after the queue is unqueued, in other words, the queue is full before the queue is unqueued.
Let’s look at the signalNotFull method again:
private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); }}Copy the code
- Open the exclusive lock, notice that the exclusive lock is putLock.
- Call notFull’s signal method to wake up the thread that is blocked because of calling notFull’s await method.
- Release the exclusive lock putLock.
poll
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
Copy the code
The main difference compared to the take method is that if the queue is empty, executing the take method blocks the current thread until it is woken up, whereas the poll method returns NULL.
peek
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
returnfirst.item; } finally { takeLock.unlock(); }}Copy the code
The peek method simply takes the value of the head node, but does not remove the node.
dequeue
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
Copy the code
Nothing to say, just pop elements, and remove the pop elements.
A small summary
Take /poll/peek () ¶ Take /poll/peek () ¶ Take /poll/peek ()
- Take: When the queue is empty, the current thread is blocked until it is woken up. An outqueue operation is performed to remove the acquired nodes.
- Poll: Null is returned when the queue is empty. An outqueue operation is performed to remove the acquired nodes.
- Peek: Returns NULL when the queue is empty. Nodes are not removed.
That concludes the core source code analysis of LinkedBlockingQueue, thank you.