This is the first day of my participation in the August Challenge. For details, see:August is more challenging
ArrayBlockingQueue is, as the name suggests, an array-based implementation of a blocking queue that can be used to buffer tasks and implement producer-consumer patterns, such as work queues in thread pools. So how do you block a queue with an array?
Let’s start with what ArrayBlockingQueue does
- First of all, it is a queue, and the queue needs to have the ability to enter and exit the queue
- Since it is BlockingQueue, it will block the queue when it is full and release the queue when there is room left. When the queue is empty, the queue requests need to be blocked. When there are elements in the queue, the queue requests are released.
- Since ArrayBlockingQueue is a data structure used in multithreaded situations, its operations need to be thread safe
Unpack ArrayBlockingQueue implementation step
Let’s break the problem down, divide the problem into
- Use arrays to implement queues
- Add blocking capability and thread safety to queues
Use arrays to implement queues
How to use an array to implement data in and out of the queue operation, many students first think of an index field to store the next place to write to the array. So what do you do with the out-of-queue? Many of you will think of the out-of-queue as returning the first element of the array and moving all the next elements forward by one bit.
The problem with this solution is that when you move a large number of elements out of the queue, the time complexity is O(n). Is there a more efficient solution? Another way to loop through an array is to use two int fields to record the position of the element to be queued and the next element to be queued. When queued to the end of the array, we start at 0, and when queued to the end of the array, we start at 0.
In addition, when the queue is empty and when the queue is full, takeIndex and putIndex both point to the same location, so to distinguish between them, we can use a count field to store the number of queue elements, so that when count=0, the queue is 0, and when count= array capacity, the queue is full
The following code shows a concrete implementation of queues using arrays.
class ArrayBlockingQueue<E> {
final Object[] items;
int takeIndex;
int putIndex;
int count;
public ArrayBlockingQueue(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
}
private void enqueue(E e) {
Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
}
private E dequeue(a) {
Object[] items = this.items;
E e = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
returne; }}Copy the code
Implement conditional blocking and thread safety
The requirement to “block queueing requests when the queue is full and release queueing requests when there is room in the queue” is essentially a special case of conditional waiting. The condition for writing is that the queue is not satisfied. If the condition is not met, wait until the condition is met.
Synchronized + object. wait and Lock+Condition. Await are two ways to implement conditional wait in Java
- Synchronized does not support interrupt
- Synchronized cannot support multiple conditions
With the Lock and Condition scheme, it is also possible to ensure thread safety, because in the above circular array implementation, the variables shared between threads are items array, takeIndex, putIndex, and count. Thread safety involves several aspects of atomic visibility reordering. The Lock class protects read and write operations on shared variables.
Define the blocking Lock object and the Condition, which are dissatisfied and not empty.
class ArrayBlockingQueue<E> {
final Object[] items;
int takeIndex;
int putIndex;
int count;
ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
public ArrayBlockingQueue(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
// Create the lock object
lock = new ReentrantLock();
// Create a non-empty Condition
notEmpty = lock.newCondition();
// Create a dissatisfied ConditionnotFull = lock.newCondition(); }}Copy the code
For example, if the queue is not satisfied, that is, count < items.length. If the queue is not satisfied, that is, count == items.length. When the conditions are met, we can join the queue. After joining the queue, we need to wake up the threads waiting to leave the queue.
The process of the PUT method is
- First lock
- Call notfull.await (). Await () releases the lock. When another thread signals it, it grabs the lock again.
- If the conditions are met, join the queue
- Call notempty.signal () after enqueueing to wake up a thread waiting for the notFull condition
- Finally releases the lock
public void put(E e) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
notEmpty.signal();
} finally{ lock.unlock(); }}Copy the code
There are some small details in the method
- Why declare a lock local variable in the put method first?
ReentrantLock lock = this.lock; This is because if you don’t use local variables, all subsequent calls that use instance variables, at the bytecode level, need to be aload 0 to get this, then getField to get the field value, and then anything else. By saving the lock in a local variable, all subsequent lock fetching can become an Aload XXX instruction, saving instructions and speeding up method execution.
- Why does the while loop need to be in a lock?
If you do not place it in a lock, it is possible for multiple threads to see that the condition is met at the same time and then add the lock to the queue. Although the enqueue is still in the critical zone, there will be cases when the queue is full and the enqueue operation is still being performed. This problem is similar to the problem that one check is less in double check Locking of the singleton.
The take method is the corresponding to the PUT method, and the PUT process is basically the same
public E take(a) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
E element = dequeue();
notFull.signal()
return element;
} finally{ lock.unlock(); }}Copy the code