Art is long, life is long

A list,

ArrayBlockingQueue is exactly what it sounds like: an array-based blocking queue. Arrays have a specified length, so when you use ArrayBlockingQueue you must specify the length, that is, it is a bounded queue. It implements the BlockingQueue interface, with all the methods for queues, collections, and blocking queues.

ArrayBlockingQueue is thread-safe, internally guaranteed by ReentrantLock. ArrayBlockingQueue supports fair scheduling of producer and consumer threads. Fairness is not guaranteed by default, of course, because fairness generally reduces throughput, but it can reduce variability and avoid thread starvation.

Second, data structure

Queues are usually implemented as arrays and linked lists. For the array implementation, we can maintain a queue tail pointer, so that the queue can be completed in O(1)O(1) time; But for the queue out operation, after the queue head element is deleted, all elements in the array must be moved forward by one position. This operation is O(n)O(n), which is not very good. As shown below:

To solve this problem, we can use another logical structure to deal with the relationships between positions in the array. Suppose we now have an array A[1… n], which we can think of as A ring structure, A[n] followed by A[1], which should be easy to understand for those who have seen consistent Hash algorithms.

As shown in the figure below: We can use two Pointers to maintain the two positions of the head and the tail of the team respectively, so that the joining and leaving operations can be completed in O(1O(1) time. Of course, this ring structure is only logical structure, the actual physical structure is a normal array.

Having covered the data structure of ArrayBlockingQueue, let’s look at how it blocks at the source level.

Third, source code analysis

3.1 attributes

// The underlying structure of the queue
final Object[] items;
// queue head pointer
int takeIndex;
// end of queue pointer
int putIndex;
// The number of elements in the queue
int count;

final ReentrantLock lock;

// Two states of concurrency
private final Condition notEmpty;
private final Condition notFull;
Copy the code

Items is an array used to store queued data; Count indicates the number of elements in the queue; TakeIndex and putIndex represent the first and last team Pointers respectively.

3.2 Construction method

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        int i = 0;
        try {
            for(E e : c) { checkNotNull(e); items[i++] = e; }}catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally{ lock.unlock(); }}Copy the code

The first constructor only needs to specify the queue size and defaults to an unfair lock; The second constructor can manually specify fairness and queue size; The third constructor uses ReentrantLock to lock and place the collection elements passed in sequence into items. The purpose of locking here is not to use its mutual exclusion, but to make elements in items visible to other threads (see volatile visibility of state in AQS).

3.3 methods

3.3.1 team

ArrayBlockingQueue provides a variety of enqueue implementations to meet the needs of different situations. The enqueue operations are as follows:

  • boolean add(E e)

  • void put(E e)

  • boolean offer(E e)

  • boolean offer(E e, long timeout, TimeUnit unit)

(1) Add (E E)

public boolean add(E e) {
    return super.add(e);
}

//super.add(e)
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
Copy the code

You can see that the add method calls the parent class, which is AbstractQueue’s Add method, which actually calls the Offer method.

(2) Offer (E E)

Let’s look at the offer method from the add method above:

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true; }}finally{ lock.unlock(); }}Copy the code

The Offer method returns false if the queue is full, otherwise the enQueue method is called to insert elements and returns true.

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    // Index of the ring
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}
Copy the code

The enQueue method first places the items in the putIndex position, and then sets putIndex to 0 when putIndex+1 equals the length of the queue, which is the index operation mentioned above. Finally, wake up the thread waiting to fetch the element.

(3) Offer (E E, long timeout, TimeUnit unit)

This method adds the concept of timeout on the basis of offer(E E).

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {

    checkNotNull(e);
    // Convert the timeout to nanoseconds
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    // Get a breakable mutex
    lock.lockInterruptibly();
    try {
        // The purpose of the while loop is to continue retry in case the timeout passed is not reached after the interrupt
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            // Wait for nanos nanoseconds, return the remaining wait time (can be interrupted)
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally{ lock.unlock(); }}Copy the code

Condition’s awaitNanos method is used to wait for a specified time. Since this method can be interrupted, a while loop is used to deal with the problem that there is still time after the interrupt. Enqueue is called when the waiting time is up.

(4) Put (E)

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally{ lock.unlock(); }}Copy the code

The put method waits until it is woken up by another thread when count equals items. Call enQueue to queue after wake up.

3.3.2 rainfall distribution on 10-12 out team

So with that out of the way, let’s talk about the way to queue. ArrayBlockingQueue provides a variety of queue operations to meet the needs of different situations, as follows:

  • E poll()

  • E poll(long timeout, TimeUnit unit)

  • E take()

  • drainTo(Collection<? super E> c, int maxElements)

(1) the poll ()

public E poll(a) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0)?null : dequeue();
    } finally{ lock.unlock(); }}Copy the code

The poll method is non-blocking and returns null if there are no elements in the queue, otherwise deQueue is called to remove the first element from the queue.

private E dequeue(a) {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if(itrs ! =null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}
Copy the code

Dequeue retrieves the element at that position according to the takeIndex, sets the position to null, then uses the ring principle, sets it to 0 when the takeIndex reaches the length of the list, and finally wakes up the thread waiting for the element to be placed in the queue.

(2) poll(long timeout, TimeUnit unit)

This method is the configurable timeout wait method of poll(), which, like the offer above, uses a while loop to wait along with the Condition’s awaitNanos, and dequeue is executed to fetch elements when the wait time is up.

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally{ lock.unlock(); }}Copy the code

(3) take ()

public E take(a) throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally{ lock.unlock(); }}Copy the code

Unlike the poll() method, if BlockingQueue is empty, it blocks until new data is added.

(4) drainTo ()

public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
}

public int drainTo(Collection<? super E> c, int maxElements) {
    checkNotNull(c);
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int n = Math.min(maxElements, count);
        int take = takeIndex;
        int i = 0;
        try {
            while (i < n) {
                @SuppressWarnings("unchecked")
                E x = (E) items[take];
                c.add(x);
                items[take] = null;
                if (++take == items.length)
                    take = 0;
                i++;
            }
            return n;
        } finally {
            // Restore invariants even if c.add() threw
            if (i > 0) {
                count -= i;
                takeIndex = take;
                if(itrs ! =null) {
                    if (count == 0)
                        itrs.queueIsEmpty();
                    else if (i > take)
                        itrs.takeIndexWrapped();
                }
                for (; i > 0&& lock.hasWaiters(notFull); i--) notFull.signal(); }}}finally{ lock.unlock(); }}Copy the code

Compared to other fetching methods, drainTo can fetch all available data objects from the queue at once (you can also specify how many data to fetch). By using this method, data acquisition efficiency can be improved without locking or releasing locks in batches for multiple times.

3.3.3 Getting Elements

public E peek(a) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally{ lock.unlock(); }}final E itemAt(int i) {
    return (E) items[i];
}
Copy the code

The element is locked to avoid dirty data.

3.3.4 Deleting Elements

We can imagine that when we delete an element from the queue, we have to go through the data to find that element and move all the elements after that element forward one bit. In other words, the time of this method is O(n)O(n).

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
             // Walk from takeIndex to putIndex until you find the same element as element o and call removeAt to remove it
            do {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                if (++i == items.length)
                    i = 0;
            } while(i ! = putIndex); }return false;
    } finally{ lock.unlock(); }}Copy the code

The remove method is relatively simple. It traverses from takeIndex to putIndex until it finds an element that is identical to element O and calls removeAt to remove it. Let’s focus on the removeAt method.

void removeAt(final int removeIndex) {
    final Object[] items = this.items;
    if (removeIndex == takeIndex) {
        // removing front item; just advance
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if(itrs ! =null)
            itrs.elementDequeued();
    } else {
        // an "interior" remove
        // slide over all others up through putIndex.
        final int putIndex = this.putIndex;
        for (int i = removeIndex;;) {
            int next = i + 1;
            if (next == items.length)
                next = 0;
            if(next ! = putIndex) { items[i] = items[next]; i = next; }else {
                items[i] = null;
                this.putIndex = i;
                break;
            }
        }
        count--;
        if(itrs ! =null)
            itrs.removedAt(removeIndex);
    }
    notFull.signal();
}
Copy the code

RemoveAt works a little differently than I expected, internally it is considered in two cases:

  • removeIndex == takeIndex

  • removeIndex ! = takeIndex

So I’m thinking about it without thinking about the boundary problem. When removeIndex == takeIndex, there is no need to move the whole element forward, but only need to point to the next element (analogous to the circle); When removeIndex! When = takeIndex, move the element after removeIndex forward by putIndex.

Four,

ArrayBlockingQueue is a blocking queue that is internally thread-safe by ReentrantLock and wake-up by the Condition’s await and signal. Its data structure is an array, or rather a loop array (analogous to a ring), with all subscripts automatically starting at zero when the maximum length is reached.