From the school to A factory all the way sunshine vicissitudes of life
Please go to www.codercc.com
1. Introduction of ArrayBlockingQueue
In the multithreaded programming process, for the purpose of business decoupling and architecture design, concurrent containers are often used to store shared data between multiple threads, which can not only ensure thread safety, but also simplify the operation of each thread. For example, in producer-consumer problems, BlockingQueue is used as a data container. See this article on BlockingQueue. The only way to understand blocking queues is to understand how they work, and this article focuses on how ArrayBlockingQueue and LinkedBlockingQueue are implemented.
2. ArrayBlockingQueue implementation principle
The core function of a blocking queue is the ability to block the insertion and deletion of queue elements. When the current queue is empty, the thread consuming data is blocked, until the queue is not empty, the blocked thread is notified; When the queue is full, the inserting thread is blocked until the queue is full, notifying the inserting thread (the producer thread). Therefore, the most common message notification mechanism in multithreading is the Lock condition mechanism, which can be described in detail in this article. Does the ArrayBlockingQueue implementation also use Condition’s notification mechanism? So let’s see.
2.1 Main properties of ArrayBlockingQueue
The main properties of ArrayBlockingQueue are as follows:
/** The queued items */ final Object[] items;
/** items index for next take, poll, peek or remove */ int takeIndex;
/** items index for next put, offer, or add */ int putIndex;
/** Number of elements in the queue */ int count;
/ *
- Concurrency control uses the classic two-condition algorithm
- found in any textbook. */
/** Main lock guarding all access */ final ReentrantLock lock;
/** Condition for waiting takes */ private final Condition notEmpty;
Copy the code
/** Condition for waiting puts */ private final Condition notFull;
ArrayBlockingQueue uses ReentrantLock lock to ensure thread safety. Condition is used to ensure that data can be inserted and deleted in a blocking manner. It is placed on the notEmpty wait queue when the consumer thread fetching data is blocked, and on the notFull wait queue when the producer thread inserting data is blocked. For notEmpty and notFull, attributes are created in constructors:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
Copy the code
Next, I’ll focus on how the blocking PUT and take methods are implemented.
2.2 Explain the PUT method
Put (E, E);
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); While (count == items.length) notful.await (); // If the current queue is full, move the thread to the notFull queue. Enqueue (e); enqueue(e); } finally { lock.unlock(); }}Copy the code
The logic of this method is simple: the thread is moved to the notFull wait queue when the queue is full (count == items.length), and enqueue(e) can be called directly to insert data elements if the criteria for inserting data are currently met. Enqueue method source code:
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; // Insert data items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; // Notify the consumer thread that there is data available for consumption in the current queue notempty.signal (); }Copy the code
The logic of the enQueue method is also simple: insert data by adding data to the array (items[putIndex] = X) and then notify the blocked consumer thread that there is data available for consumption in the current queue (notempty.signal ()).
2.3 Take method in detail
The take method is as follows:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); While (count == 0) notempty.await (); // If the queue is empty and there is no data, move the consumer thread into the wait queue. Return dequeue(); } finally { lock.unlock(); }}Copy the code
The take method also does two main steps: 1. If the current queue is empty, the consumer thread that fetched the data is moved to the wait queue; 2. 2. If the queue is not empty, data is obtained, that is, the dequeue operation is completed. Dequeue = dequeue ();
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] ! = null; final Object[] items = this.items; E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs ! = null) itrs.elementDequeued(); // Notify the blocked producer thread notfull.signal (); return x; }Copy the code
The dequeue method also does two things: 1. Get the data in the queue, that is, get the data elements in the array ((E) items[takeIndex]); 2. Notify the thread in the notFull wait queue to move from the wait queue to the synchronization queue so that it has the opportunity to acquire the lock and exit successfully.
From the above analysis, it can be seen that the PUT and take methods mainly implement the blocking insertion and acquisition of data through the notification mechanism of condition. It’s easy to understand LinkedBlockingQueue after you understand ArrayBlockingQueue.
3. Implementation principle of LinkedBlockingQueue
LinkedBlockingQueue is a bounded blocking queue implemented as a linked list. When constructing an object, the queue size is specified. The default queue size is integer.max_value. It can be seen from its construction method:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
Copy the code
3.1 Main attributes of LinkedBlockingQueue
The main attributes of LinkedBlockingQueue are:
/** Current number of elements */ private final AtomicInteger count = new AtomicInteger();
/ * *
- Head of linked list.
- Invariant: head.item == null */ transient Node<E> head;
/ * *
- Tail of linked list.
- Invariant: last.next == null */ private transient Node<E> last;
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock();
Copy the code
/** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
The main difference between LinkedBlockingQueue and ArrayBlockingQueue is that there are two different locks (takeLock and putLock) that control thread safety when inserting data and when deleting data. The two locks also generate two corresponding conditions (notEmpty and notFull) for blocking insert and delete data. In addition, the linked list data structure is adopted to realize the queue, and the Node Node is defined as:
static class Node<E> { E item;
/** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node< E> next; Node(E x) { item = x; }Copy the code
Copy the code
}
Next, we’ll also look at the implementation of the PUT and take methods.
3.2 Details about the PUT method
Put method source code:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = - 1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of While (count.get() == capacity) {notfull.await (); } // Add data to queue(node); c = count.getAndIncrement(); If (c + 1 < capacity) notFull.signal(); // If (c + 1 < capacity) notfull.signal (); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }Copy the code
The logic of the PUT method is also easy to understand, visible with annotations. Basically the same put method as ArrayBlockingQueue. The take method has the following source code:
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); While (count.get() == 0) {notempty.await (); while (count.get() == 0) {notempty.await (); } // remove the queue header element, get data x = dequeue(); c = count.getAndDecrement(); // Notify the blocked consumer thread if (c > 1) notempty.signal (); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }Copy the code
The main logic of the take method is in the comments and is easy to understand.
4. ArrayBlockingQueue vs. LinkedBlockingQueue
Similarity: ArrayBlockingQueue and LinkedBlockingQueue both implement blocking insertion and deletion of elements through condition notification mechanism, and meet thread-safe features.
Differences: 1. ArrayBlockingQueue is implemented as an array, while LinkedBlockingQueue is implemented as a linked list.
- ArrayBlockingQueue inserts and deletes data using a single lock, while LinkedBlockingQueue inserts and deletes data using a single lock
putLock
andtakeLock
In this way, the possibility of the thread entering the WAITING state because the thread cannot obtain the lock is reduced, thus improving the efficiency of concurrent execution of the thread.