Welcome to github.com/hsfxuebao/j… , hope to help you, if you feel ok, please click on the Star
In Java8, seven blocking queues are provided
- A bounded blocking queue implemented by the ArrayBlockingQueue array that sorts elements on a first-in, first-out (FIFO) basis.
- LinkedBlockingQueue The bounded blocking queue implemented by the list. The default and maximum length of this queue are integer.max_value. This queue sorts elements on a first-in, first-out basis
- PriorityBlockingQueue An unbounded blocking queue that supports priority sorting. By default, elements are sorted in a natural ascending order. You can also customize the class to implement the compareTo() method to specify the element collation rules, or when initializing PriorityBlockingQueue, specify the constructor parameter Comparator to sort the elements.
- DelayQueue Unbounded blocking queue implemented by priority queues
- SynchronousQueue The blocking queue that does not store elements. Each PUT operation must wait for a take operation or elements cannot be added.
- LinkedTransferQueue An unbounded blocking queue implemented by a linked list
- LinkedBlockingDeque a two-way blocking queue implemented by the linked list
In blocking queues, there are four processing options
-
Insert operation Add (e) : Adds elements to the queue. If the queue is full, continuing to insert elements will result in an error, IllegalStateException. offer(e) : Put (e) : When the blocking queue is full, the producer continues to add elements by putting. The queue will block the producer thread until the offer(e,time,unit) is available: When the blocking queue is full, the producer thread is blocked for a specified period of time, and if it times out, the thread exits
-
Remove (): a call to remove returns false when the queue is empty, or true poll() if the element was removed successfully: If the queue is empty, null take() is returned. If the queue is empty, the take method blocks until there is new data in the queue to consume. Poll (time,unit) Retrieves data with a timeout mechanism. If the queue is empty, it waits a specified amount of time before retrieving the element
A constructor
ArrayBlockingQueue provides three constructors: Capacity: indicates the length of the array (that is, the length of the queue) Fair: indicates whether it is a fair blocking queue. By default, an unfair blocking queue is constructed. The third constructor, which is not explained, provides a method that receives a few as data initializers
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); NotEmpty = lock.newCondition(); notEmpty = lock.newCondition(); // Initialize the non-empty wait queue notFull = lock.newCondition(); // Initialize a non-full wait queue}Copy the code
About the use of the lock, we did not see the next source before, you can think about his role. When items is constructed, it looks like this
The add method
With the add method as the entry point, the add method of the parent class, known as AbstractQueue, is called from inside the add method. If you look at the source code more, the general way to write this is to call the parent class template method to solve the generality problem
public boolean add(E e) {
return super.add(e);
}
Copy the code
As you can see from the parent class’s Add method, a check is made to see if the queue is full, and if it is, an exception is thrown
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
Copy the code
Offer method
The add method ends up calling the Offer method to add data, returning a Boolean for success or failure. This code does several things
-
Check whether the added data is empty
-
Add a heavier entry lock
-
Check the queue length. If the queue length is equal to the array length, it is full and returns false
-
Otherwise, call enQueue directly to add elements to the queue
public boolean offer(E e) { checkNotNull(e); Final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); }}
enqueue
This is the core logic, adding elements directly to the array items via the putIndex index inside the method
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; If (++putIndex == items.length) // If putIndex equals the array length, reset putIndex to 0. PutIndex = 0; count++; Notempty.signal (); // Wake up the thread in the wait state, indicating that the current queue element is not empty, if there is a consumer thread blocking, can start fetching elements}Copy the code
Why does putIndex reset to 0 when it equals the length of the array?
Since ArrayBlockingQueue is a FIFO queue, when an element is added to a queue, the putIndex is retrieved from the end of the queue to store the element. When putIndex equals the length of the array, the next addition is made from the head of the array
The figure below simulates the change of putIndex when adding elements of different lengths. When putIndex is equal to the length of the array, it is impossible to keep adding putIndex, otherwise it will exceed the size of the array initialization. At the same time, you need to think about two questions:
- When the elements are full, they cannot be added because an error will be reported
- Second, the element in the queue must have a consumer thread fetching the data by taking or other means, and the element will be removed from the queue at the same time that the data is fetched
Put method
The put method has the same function as the add method, except that it blocks if the queue is full.
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // This method also acquires the lock, but the difference is that this method preferentially allows the wait to return directly when the wait is interrupted by another thread calling the interrupt method of the wait thread. Try {while (count == items.length) notfull.await (); // If the queue is full, the current thread will be added to the queue by the notFull condition object. } finally { lock.unlock(); }}Copy the code
Take method
Take method is a kind of block element method for the queue Its implementation principle is very simple, have deleted without obstruction, pay attention to the block can be interrupted, if the queue queue waiting for no data then join notEmpty conditions (data taken directly, methods), if there is a new thread put added data, The PUT operation will wake up the take thread to perform the take operation.
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); // Block return dequeue() directly with await method if queue is empty; } finally { lock.unlock(); }}Copy the code
If an element is added to the queue, then notempty. signal is called in the enqueue to wake up the take thread to retrieve the element
To dequeue method
This is out of the queue method, mainly is to delete the head of the queue element and return to the client takeIndex, is used to record the index value of the data
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] ! = null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; // Default to get 0 items[takeIndex] = null; If (++takeIndex == items.length)// Set the element to null. If (++takeIndex == items.length)// Set it to 0. count--; If (itrs! = null) itrs.elementDequeued(); // Update the iterator element data notfull.signal (); Return x; // The thread is blocked when the queue is full. }Copy the code
itrs.elementDequeued()
In ArrayBlockingQueue, iterators can be used to iterate over the elements in the blocking queue, so itrs.elementDequeued() is used to update the takeIndex of the elements in the iterator. At the same time, as the data is removed, the thread in the PUT blocking state is woken up to continue adding data
The remove method
The remove method removes a specified element
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; Final ReentrantLock lock = this.lock; lock.lock(); Try {if (count > 0) {final int putIndex = this.putIndex; Int I = takeIndex; RemoveAt (I); do {if (o.quals (items[I])) {removeAt(I); Return true; If (++ I == items.length) I = 0; if (++ I == items.length) I = 0; } while (i ! = putIndex); } return false; } finally { lock.unlock(); }}Copy the code
Reference:
ArrayBlockingQueue Source Analysis (Based on Java 8)