preface
The producer/consumer model is a very common distributed resource scheduling model. In this model, there are at least two objects: the producer and the consumer. Producers are only responsible for creating resources, and consumers are only responsible for using resources. It would be easy to implement a simple producer/consumer model on your own, just through a queue, but this approach has many hidden pitfalls:
- You need to ensure thread-visibility of resources and manually implement thread synchronization
- Various critical cases and rejection strategies need to be considered
- A balance needs to be struck between throughput and thread safety
So now that Java has wrapped the interface and implementation for us ahead of time, let’s take a brief look at the BlockingQueue interface and its common implementation class, LinkedBlockingQueue
Blocking queue
concept
BlockingQueue stands for BlockingQueue. As we can see from the class definition, it inherits the Queue interface and can therefore be used as a Queue:
Since it is called a blocking queue, it means that the queue operations are performed in blocking mode, which is reflected in the following two aspects:
- Insert elements are blocked: when the queue is full, the thread that performed the insert is blocked
- Blocked when removing an element: When the queue is empty, the thread performing the removal is blocked
In this way, the relationship between producers and consumers can be easily coordinated
Interface methods
In BlockingQueue, the following six interfaces are defined:
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take(a) throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity(a);
boolean remove(Object o);
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
Copy the code
These interface methods can be divided into three categories by function:
- Add elements: include Add, Offer, and PUT
- Remove elements: include remove, poll, take, and drainTo
- Get/check elements: includes contains, remainingCapacity
In general, we also call adding elements a PUT operation (even if we use the offer method instead of the PUT method) and removing elements a take operation
The first two categories can be divided into the following categories in terms of exception handling:
- Throw exceptions: add and remove
- Returns special values: offer(e), poll
- Block: PUT (e), take
- Timeout exit: offer(e, time, unit), poll(time, unit)
I won’t go into any of these, but the literal meaning is obvious
Implementation of blocking queues
JDK8 provides the following BlockQueue implementation classes:
We commonly use the following basic types:
- ArrayBlockingQueue: A bounded blocking queue based on an ArrayList implementation
- LinkedBlockingQueue: A bounded blocking queue based on the LinkedList implementation
- PriorityBlockingQueue: Priority queue, unbounded
- DelayQueue: An unbounded priority queue that supports delayed fetching of elements
For those of you who are interested in the rest of the implementation, let’s use LinkedBlockingQueue as an example to explain how Java implements blocking queues
Interface methods
In addition to the interface methods provided by BlockingQueue, LinkedBlockingQueue also provides a method peek to get the first queue node
At this point, the common blocking queue methods have been explained. Here is a table to summarize them [1] :
Method/processing mode | An exception is thrown | Return special value | blocking | Timeout exit |
---|---|---|---|---|
Insert elements | add(e) | offer(e) | put(e) | offer(e, timeout, unit) |
Remove elements | remove() | poll() | take() | poll(timeout, unit) |
Access to elements | element() | peek() | / | / |
The Element method and peek method have the same function
attribute
BlockingQueue only defines the interface specification. The actual implementation is done by the concrete implementation class. Let’s skip AbstractQueue for a moment and go straight to LinkedBlockingQueue, which defines several important domain objects:
/** Number of elements */
private final AtomicInteger count = new AtomicInteger();
/** queue first node */
transient Node<E> head;
/** the end of the queue */
private transient Node<E> last;
/** Locks held by methods such as take, poll, etc
private final ReentrantLock takeLock = new ReentrantLock();
/** wait queue for the take method */
private final Condition notEmpty = takeLock.newCondition();
/** Locks held by methods such as put and offer, which are called put locks or in-locks */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for the put method */
private final Condition notFull = putLock.newCondition();
Copy the code
Like LinkedList, we focus on the next four domain objects, which fall into two categories: those for inserting elements, and those for removing elements. Each of these classes has two attributes: ReentranLock and Condition. Among them, ReentranLock is a reentrant lock based on AQS[2] (those who do not understand the concept of reentrant can be regarded as ordinary locks), and Condition is a concrete implementation of wait/notification mode (which can be understood as a class providing more powerful wait and notify).
The count attribute goes without saying, but head and last are obviously used to maintain queues that store elements. Blocking queues are distinguished from normal queues by the four attributes of the ReentrantLock and Condition types, the meaning of which will be further analyzed in the next few modules
But let’s take a quick look at the Condition class for the sake of our discussion. In fact, Condition is an interface, and the implementation class is in AQS. For the purposes of this article, you only need to know three methods: await(), signal(), and singalAll(). Wait (), notify(), and notifyAll() are similar to wait(), notify(), and notify(). They manage object locks and class locks, and manipulate queues of threads waiting for these locks. Await /signal methods manage AQS based locks and manipulate AQS thread wait queues
So notEmpty here maintains a queue of threads waiting to take locks, and notFull maintains a queue of threads waiting to put locks. NotEmpty means “the queue is notFull,” so you can take elements, and notFull means “the queue is notFull,” so you can insert elements
Insert elements
offer(e)
Offer (e);
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// If the capacity reaches the upper limit, false is returned
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
// Get the put lock
putLock.lock();
try {
if (count.get() < capacity) {
// Join the team and increment the number of elements
enqueue(node);
// Note that c returns the value before increment
c = count.getAndIncrement();
// If the capacity reaches the upper limit, a PUT operation is invoked
if (c + 1< capacity) notFull.signal(); }}finally {
/ / unlock
putLock.unlock();
}
if (c == 0)
// If the queue is previously empty, a take operation is invoked
signalNotEmpty();
return c >= 0;
}
Copy the code
The offer method returns false to the user when adding elements is not allowed, similar to non-blocking communication. The thread-safety of the Offer method is guaranteed by a put lock
Now, one interesting thing here is that we’re going to look at the end and say if c == 0, then we’re going to wake up a take operation. C = count.getandIncrement () c = count getAndIncrement() C = count.getandIncrement () That is, if c == 0, then the return value of this statement is 0, i.e., the queue is empty before the element is inserted. So, if the queue is initially empty, a take operation will be invoked immediately after the first element is inserted. [3]
So far, the whole method process can be summarized as follows:
- To obtain
Put the lock
- Elements join the team and increment
count
value - If the capacity does not reach the upper limit, wake up one
put
operation - If the queue is empty before the element is inserted, one is woken up at the end
take
operation
offer(e, timeout, unit)
To strike while the iron is hot, let’s move on to the offer method with a timeout mechanism:
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// The put lock can be acquired interruptively
putLock.lockInterruptibly();
try {
// Execute the body of the while loop repeatedly until the queue is empty or the timeout is reached
while (count.get() == capacity) {
// Return false after the timeout period
if (nanos <= 0)
return false;
// Add the current thread to the notFull wait queue,
// Returns the remaining available wait time
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
Copy the code
The whole method is basically the same as offer(e) method, with two differences:
- The lock is acquired in interruptible form, i.e
putLock.lockInterruptibly()
- If the queue is always full, a loop is executed
notFull.awaitNanos(nanos)
Action to add the current thread tonotFull
Waiting in the queue (waitingput
Operation execution)
The rest is exactly the same as offer(e) and will not be repeated here
add(e)
The add method, in contrast to the offer method, throws an exception instead of returning a special value when the operation is not allowed, as follows:
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
Copy the code
It’s just rewrapping the offer(e), so there’s nothing to talk about, but it’s actually implemented in AbstractQueue, right
put(e)
The put(e) method blocks a thread when an operation is not allowed. Let’s see how this works:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// Get the PUT lock in interruptible form
putLock.lockInterruptibly();
try {
// Compare offer(e, timeout, unit) with infinite wait
while (count.get() == capacity) {
// A thread in the notFull queue is woken up by signal when the element removal operation is performed
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
Copy the code
The put(e) operation is similar to the offer(e, timeout, unit) operation. The only difference is that when the queue is full, the await operation no longer has a timeout. You can only wait for the take operation [4] to call the signal method to wake up the thread
Remove elements
poll()
The poll() method is used to remove and return the first node of the queue.
public E poll(a) {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
// Get the take lock
takeLock.lock();
try {
if (count.get() > 0) {
// get out of the queue
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
// As long as there are elements in the queue, a take operation is invokednotEmpty.signal(); }}finally {
takeLock.unlock();
}
// If an element is removed while the queue is full, a PUT operation is invoked
if (c == capacity)
signalNotFull();
return x;
}
Copy the code
If you really look at the offer(e) method, the poll() method is nothing to talk about, it’s just a copy of the offer(e) method (I want to talk about something, but the poll() method is exactly the same as the offer(e) process…).
other
Poll (timeout, unit)/ poll(timeout, unit)/take()/remove() methods offer(e, timeout, unit)/put()/add() methods
Access to elements
peek()
The peek() method is used to get the header element, which is implemented as follows:
public E peek(a) {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
// Get the take lock
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally{ takeLock.unlock(); }}Copy the code
There is nothing to be said for the process, but note that this method requires the take lock, which means that the element removal operation cannot be performed during peek() execution
element()
An implementation of the element() method is in AbstractQueue:
public E element(a) {
E x = peek();
if(x ! =null)
return x;
else
throw new NoSuchElementException();
}
Copy the code
It’s the same secondary encapsulation operation
conclusion
Instead of saying BlockingQueue, I said LinkedBlockingQueue. However, as a classic implementation of blocking queues, the idea of how methods are implemented in LinkedBlockingQueue is also important to understand blocking queues. To understand the concept of blocking queues, it is important to understand the concept of locks, such as LinkedBlockingQueue that is thread-safe through producer/PUT locks and consumer /take locks, as well as the Condition object corresponding to the lock. Understanding this leads to understanding of the entire producer/consumer model
-
See the Art of Concurrent Programming in Java ↩︎
-
See AQS at ↩︎
-
The description “wake up a take operation” is somewhat inaccurate. It should actually be “wake up a thread waiting for a take lock”, but I think the former is more helpful for readers to understand, so I use the former description ↩︎
-
It refers to a group of methods similar to take, including take/poll/remove. The put operation is the same as ↩︎