preface
LinkedBlockingQueue = LinkedBlockingQueue Any inaccuracies, welcome to point out, thank you.
LinkedBlockingQueue overview of
The inheritance diagram for LinkedBlockingQueue
Let’s start by looking at the inheritance system of LinkedBlockingQueue. Use IntelliJ IDEA to view the inheritance graph of a class
- The solid blue arrows are class inheritance relationships
- Green arrow Solid arrow indicates the interface inheritance relationship
- The dotted green arrow indicates the interface implementation relationship.
LinkedBlockingQueue implements Serializable interface, so it has serialization features. LinkedBlockingQueue implements the BlockingQueue interface, and BlockingQueue inherits the Queue interface, so it has operations for queue-related methods.
LinkedBlockingQueue class diagram
Class diagrams from the beauty of Concurrent programming in Java
Key features of LinkedBlockingQueue:
- LinkedBlockingQueue The underlying data structure is a one-way linked list.
- LinkedBlockingQueue has two nodes, a head Node and a tail Node, which can only fetch elements from head and add elements from tail.
- The LinkedBlockingQueue capacity is an atomic variable count, which starts with a value of 0.
- LinkedBlockingQueue has two ReentrantLock locks, one to enqueue elements and one to unqueue elements, ensuring thread-safety in the event of concurrency.
- LinkedBlockingQueue has two conditional variables, notEmpty and notFull. They all have a conditional queue inside, which holds threads that are blocked in and out of the queue. This is actually a producer-consumer model.
The important member variable of LinkedBlockingQueue
The default value is integer. MAX_VALUE private final int capacity; Private Final AtomicInteger count = new AtomicInteger(); // transient Node<E> head; // private transient Node<E> last; Private final ReentrantLock takeLock = new ReentrantLock(); Private final Condition notEmpty = takelock.newcondition (); private final Condition notEmpty = takelock.newcondition (); Private final ReentrantLock putLock = new ReentrantLock(); Private final Condition notFull = putLock.newCondition(); private Final Condition notFull = putLock.newCondition(); private Final Condition notFull = putLock.newCondition();Copy the code
Constructor of LinkedBlockingQueue
LinkedBlockingQueue has three constructors:
- Constructor with no arguments and capacity integer.max
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
Copy the code
- Sets the constructor for the specified capacity
public LinkedBlockingQueue(int capacity) {
if(capacity <= 0) throw new IllegalArgumentException(); // Set queue size this.capacity = capacity; Last = head = new Node<E>(null); }Copy the code
- If the constructor is called, the default capacity is integer.max_value
public LinkedBlockingQueue(Collection<? Extends E> c) {// Calls the constructor with the specified capacity this(integer.max_value); Final ReentrantLock putLock = this.putLock; putLock.lock(); try { int n = 0; // Loop to add elements from the collection to the queuefor (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full"); Enqueue (new Node<E>(E)); ++n; } // Update count. Set (n); } finally {putlock. unlock(); }}Copy the code
LinkedBlockingQueue Underlying Node class
The Node source
Static class Node<E> {// The element value of the current Node is E item; // The index of the next Node Node<E> next; // Node constructor Node(E x) {item = x; }}Copy the code
The nodes of LinkedBlockingQueue conform to the data structure requirements of one-way lists:
- A member variable is the element value of the current node
- A member variable is the index of the next node
- Constructor’s unique parameter node element value.
The Node Node figure
Item represents the element value of the current node, and next represents a pointer to the next node
LinkedBlockingQueue Common action
Offer operation
To join a queue, you insert an element to the end of the queue. If the element is empty, a null-pointer exception is thrown. If the queue is full, the current element is discarded, returning false, which is non-blocking. Returns true if the queue is idle and the insert succeeded.
Offer the source code
The offer method is as follows:
Public Boolean offer(E E) {// set null to nullif(e == null) throw new NullPointerException(); final AtomicInteger count = this.count; // If the current queue is full, return directlyfalse
if (count.get() == capacity)
return false; int c = -1; Node<E> Node = new Node<E>(E); Final ReentrantLock putLock = this.putLock; putLock.lock(); Try {// Determines whether the queue is fullif(count.get() < capacity) {// Enqueue (node); C = count.getAndincrement (); // If the element is queued and idle, it wakes up the blocked thread in the notFull conditional queueif(c + 1 < capacity) notFull.signal(); }} finally {putlock.unlock (); } // If the capacity is 0, thenif(c == 0) // Activate the conditional queue for notEmpty and wake up the blocked thread signalNotEmpty();return c >= 0;
}
Copy the code
The enqueue method is as follows:
Private void enqueue(Node<E> Node) {last = last. Next = Node; }Copy the code
To visualize this, let’s use A diagram to look at putting elements A and B in the queue. Talk about LinkedBlockingQueue
The signalNotEmpty method is listed below
private void signalNotEmpty() {// Take exclusive lock final ReentrantLock takeLock = this.takelock; takeLock.lock(); Notempty.signal (); // Wake up the blocked thread in the notEmpty conditional queue. } finally {takelock.unlock (); }}Copy the code
Offer Execution Flow Chart
Basic process:
- Determines whether the element is empty and throws a null-pointer exception if so.
- Check whether the queue is full, if so, add failed, return false.
- If the queue is not full, construct the Node Node and lock it.
- Check whether the queue is full. If the queue is not full, the Node joins the queue at the end.
- After joining the queue, determine if the queue is free and, if so, wake up the blocking thread of notFull.
- After releasing the lock, determine whether the capacity is empty and, if so, wake up the notEmpty blocking thread.
The put operation
The put method also inserts an element to the end of the queue. If the element is null, a null pointer exception is thrown. If the queue is full, the current thread is blocked until the queue is successfully inserted. If the queue is idle, the insertion succeeds. If the interrupt flag is set by another thread while blocking, the blocked thread throws InterruptedException and returns.
Put the source code
Public void put(E E) throws InterruptedException {//// If the null null pointer is thrown incorrectlyif(e == null) throw new NullPointerException(); int c = -1; Node<E> Node = new Node<E>(E); Final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; / / get an exclusive lock, it goes with the difference between the lock, can be interrupted by putLock. LockInterruptibly (); Try {// The queue is fullwhile(count.get() == capacity) { notFull.await(); } // queue enqueue(node); C = count.getAndincrement (); // If the element is queued and idle, it wakes up the blocked thread in the notFull conditional queueif(c + 1 < capacity) notFull.signal(); } finally {putlock. unlock(); } // If the capacity is 0, thenif(c == 0) // Activate the conditional queue for notEmpty and wake up the blocked thread signalNotEmpty(); }Copy the code
The flow chart of the put
Basic process:
- Determines whether the element is null and throws a null-pointer exception if it is.
- Construct Node, lock (interruptible lock)
- Determines whether the queue is full. If so, block the current thread and wait.
- If the queue is not full, nodes join the queue at the end.
- After joining the queue, determine if the queue is free and, if so, wake up the blocking thread of notFull.
- After releasing the lock, determine whether the capacity is empty and, if so, wake up the notEmpty blocking thread.
Poll operation
Gets and removes an element from the head of the queue and returns NULL if the queue is empty. This method is non-blocking.
Poll source code
Poll method source code
public E poll() { final AtomicInteger count = this.count; // If the queue is empty, return nullif (count.get() == 0)
returnnull; E x = null; int c = -1; // takeLock exclusive lock final ReentrantLock takeLock = this.takeLock; takeLock.lock(); Try {// If the queue is not empty, the queue is dequeued and the count is decrementedif(count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); //// if the capacity is greater than 1, the conditional queue for notEmpty is activated and the blocked thread is woken upif(c > 1) notEmpty.signal(); }} finally {// unlock takelock.unlock (); }if(c == capacity) // Wake up the blocked thread in the notFull conditional queue signalNotFull();return x;
}
Copy the code
Dequeue method source code
// Out queue private Edequeue() {// get head Node<E> h = head; Node<E> first = h.ext; // next of the node that the head node originally points to points to itself, waiting for the next GC to collect h.ext = h; //helpGC // head = first; // get the item value E x = first.item; // Set the item value of the new head to null first.item = null;return x;
}
Copy the code
To make it vivid, let’s use a picture to describe the team process. Talk about LinkedBlockingQueue
SignalNotFull method source
private void signalNotFull() {// Get put exclusive lock final ReentrantLock putLock = this.putlock; putLock.lock(); Try {//// wakes up the blocked thread in the notFull conditional queue notfull.signal (); } finally {putlock. unlock(); }}Copy the code
The flow chart of poll
Basic process:
- Determines whether the element is empty and returns NULL if so.
- lock
- Check if the queue has elements, if not, release the lock
- If there are elements in the queue, the queue is dequeued, the data is retrieved, and the capacity counter is reduced by one.
- Determine if the capacity is greater than 1 at this time, if so, wake up the blocking thread of notEmpty.
- After releasing the lock, determine if the capacity is full and, if so, wake up the blocking thread of notFull.
Peek operation
Gets the queue header element without removing it from the queue, or returns NULL if the queue is empty. This method is non-blocking.
Peek source code
public E peek() {// Queue capacity is 0, return nullif (count.get() == 0)
returnnull; // takeLock exclusive lock final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; // Check whether first is null, if it is directly returnedif (first == null)
return null;
else
returnfirst.item; } finally {takelock.unlock (); }}Copy the code
The flow chart of peek
Basic process:
- Checks whether the queue capacity size is 0, and returns NULL if so.
- lock
- Get the queue head node first
- Check whether the first node is null, if so, return NULL.
- If FIST is not null, return the element of the node first.
- Releases the lock.
Take action
Gets the current queue header element and removes it from the queue. If the queue is empty, the current thread blocks until the queue is not empty and then returns the element. If another thread sets an interrupt flag while blocking, the blocked thread throws InterruptedException and returns.
Take the source code
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; // takeLock exclusive lock final ReentrantLock takeLock = this.takeLock; / / get an exclusive lock, it goes with the difference between the lock, can be interrupted by takeLock. LockInterruptibly (); Try {// If the current queue is empty, block and suspendwhile(count.get() == 0) { notEmpty.await(); } //) queue and decrement count x = dequeue(); c = count.getAndDecrement();if(c > 1) // Activate the conditional queue for notEmpty and wake up the blocked thread notempty.signal (); } finally {takelock.unlock (); }if(c == capacity) // Activate the notFull conditional queue and wake up the blocked thread signalNotFull();return x;
}
Copy the code
Take the flow chart
Basic process:
- lock
- Check whether the queue capacity size is 0, if so, block the current thread until the queue is not empty.
- If the queue capacity is greater than 0, the node exits the queue, gets element X, and the counter decreases by one.
- Determine whether the queue capacity is greater than 1, if so, wake up the notEmpty blocking thread.
- Releases the lock.
- Determines whether the queue capacity is full and, if so, wakes up the blocking thread of notFull.
- Returns the exit element x
The remove operation
Removes the specified element from the queue, true if it does, false if it does not.
Remove method source code
Public Boolean remove(Object o) {// Return if emptyfalse
if (o == null) return false; // double lock fullyLock(); Try {// walks through the queue, deletes the element and returns ittrue
for(Node<E> trail = head, p = trail.next; p ! = null; trail = p, p = p.next) {if(O.quals (p.item)) {// Unlink unlink(p, trail);return true; }}return false; } finally {// unlock fullyUnlock(); }}Copy the code
Double lock, fullyLock method source code
void fullyLock() {//putLock putlock. lock(); //takeLock exclusive lock takelock. lock(); }Copy the code
Unlink method source code
void unlink(Node<E> p, Node<E> trail) {
p.item = null;
trail.next = p.next;
if(last == p) last = trail; // If the current queue is full, the thread is not forgotten to wake up after deletionif (count.getAndDecrement() == capacity)
notFull.signal();
}
Copy the code
FullyUnlock method source code
void fullyUnlock() {// Instead of double locking, unlock takeLock. Unlock (); putLock.unlock(); }Copy the code
Remove the flow chart
The basic flow
- Check whether the element to be deleted is empty, and return false if so.
- If the element to be deleted is not empty, add a double lock
- Iterate through the queue to find the element to delete, or return false if none is found.
- If found, the object is removed, returning true.
- Release the lock
The size operation
Gets the number of current queue elements.
public int size() {
return count.get();
}
Copy the code
The size method of ConcurrentLinkedQueue is more accurate than that of ConcurrentLinkedQueue because count is locked.
conclusion
- The underlying LinkedBlockingQueue is implemented through a one-way linked list.
- It has head and tail nodes. The enqueue operation adds elements from the tail node, and the dequeue operation operates on the head node.
- Its capacity is the atomic variable count, which ensures the accuracy of szie’s acquisition.
- It has two exclusive locks that guarantee atomicity of queue operations.
- Its two locks are equipped with a conditional queue, used to store blocked threads, combined with queuing and queuing operations to achieve a production and consumption model.
The beauty of concurrent programming in Java is best illustrated in this diagram:
See and thank you
- The Beauty of Concurrent Programming in Java
- Block queue LinkedBlockingQueue
- Java concurrency LinkedBlockingQueue
- Talk about LinkedBlockingQueue
Personal public account
- If you are a good boy who loves learning, you can follow my public account and study and discuss with me.
- If you feel that this article is not correct, you can comment, you can also follow my public account, private chat me, we learn and progress together.