🖕 welcome to pay attention to my public number “Tong Elder brother read source code”, view more source code series articles, with Tong elder brother tour the ocean of source code.
(Mobile phone landscape view source more convenient)
The problem
(1) How to implement LinkedBlockingQueue?
(2) Is the LinkedBlockingQueue bounded or unbounded?
(3) How does LinkedBlockingQueue improve over ArrayBlockingQueue?
Introduction to the
LinkedBlockingQueue is a Java queue that sends packets to the next block queue implemented as a single linked list. It is thread-safe, and as to whether it is bounded, see the analysis below.
Source code analysis
The main properties
/ / capacity
private final int capacity;
// Number of elements
private final AtomicInteger count = new AtomicInteger();
/ / head
transient Node<E> head;
/ / list the tail
private transient Node<E> last;
/ / take the lock
private final ReentrantLock takeLock = new ReentrantLock();
/ / notEmpty conditions
// When there is no element in the queue, the take lock blocks on the notEmpty condition, waiting for another thread to wake up
private final Condition notEmpty = takeLock.newCondition();
/ / lock
private final ReentrantLock putLock = new ReentrantLock();
/ / notFull conditions
// When the queue is full, the PUT lock blocks on notFull, waiting for another thread to wake up
private final Condition notFull = putLock.newCondition();
Copy the code
(1) Capacity is the capacity of the LinkedBlockingQueue
(2) Head, last, pointer to the end of the list
(3) takeLock, notEmpty, take lock and its corresponding conditions
(4) putLock, notFull, put lock and its corresponding conditions
(5) Two different locks are used to control the entry and exit of the team, and the locks are separated to improve efficiency
The inner class
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; }}Copy the code
Typical single-linked list structure.
Main construction methods
public LinkedBlockingQueue(a) {
// If no capacity is passed, the maximum int value is used to initialize its capacity
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// Initialize the head and last Pointers to null nodes
last = head = new Node<E>(null);
}
Copy the code
The team
There are also four ways to join a team, but we will only analyze the most important one here, the put(E) method:
public void put(E e) throws InterruptedException {
// Null elements are not allowed
if (e == null) throw new NullPointerException();
int c = -1;
// Create a node
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// use put lock to lock
putLock.lockInterruptibly();
try {
// If the queue is full, the notFull condition is blocked
// Wait to be woken up by another thread
while (count.get() == capacity) {
notFull.await();
}
// When the queue is full, join the queue
enqueue(node);
// The queue length is increased by 1
c = count.getAndIncrement();
// If the queue length is smaller than the capacity
// Wake up another thread blocking on the notFull condition
// Why do you want to wake up?
// There may be many threads blocking on the notFull condition
NotFull is invoked only if the queue is full before fetching elements
// Why does notFull wake up when the queue is full?
// Wake up requires putLock to reduce the number of locks
// The thread on notFull will wake up before the element is finished
// To put it bluntly, this is also the cost of lock separation
if (c + 1 < capacity)
notFull.signal();
} finally {
/ / releases the lock
putLock.unlock();
}
// If the queue length was 0, the notEmpty condition is now invoked immediately after the addition of an element
if (c == 0)
signalNotEmpty();
}
private void enqueue(Node<E> node) {
// Add it directly after last
last = last.next = node;
}
private void signalNotEmpty(a) {
final ReentrantLock takeLock = this.takeLock;
/ / and take the lock
takeLock.lock();
try {
// Wake up the notEmpty condition
notEmpty.signal();
} finally {
/ / unlocktakeLock.unlock(); }}Copy the code
(1) Use putLock to lock;
(2) If the queue is full, the notFull condition is blocked;
(3) Otherwise join the team;
(4) If the number of elements in the queue is less than the capacity, wake up other threads blocking on the notFull condition;
(5) Release lock;
(6) If the queue length is 0 before the element is placed, the notEmpty condition is awakened;
Out of the team
There are also four ways to get out of a team, but we will only analyze the most important one here, the take() method:
public E take(a) throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// Use takeLock to lock
takeLock.lockInterruptibly();
try {
// If the queue has no elements, it blocks on the notEmpty condition
while (count.get() == 0) {
notEmpty.await();
}
// Otherwise, get out of line
x = dequeue();
// Get the queue length before the queue
c = count.getAndDecrement();
// Wake up notEmpty if the queue length is greater than 1
if (c > 1)
notEmpty.signal();
} finally {
/ / releases the lock
takeLock.unlock();
}
// If the queue length is equal to the capacity
// Wake up notFull
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue(a) {
// The head node itself does not store any elements
// delete the head and use the next node of head as the new value
// Return the original value
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
private void signalNotFull(a) {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
/ / wake notFull
notFull.signal();
} finally{ putLock.unlock(); }}Copy the code
(1) Use takeLock to lock;
(2) If the queue is empty, the notEmpty condition is blocked;
(3) Otherwise leave the team;
(4) If the number of elements before queuing is greater than 1, wake up other threads blocking on notEmpty condition;
(5) Release lock;
(6) Wake up the notFull condition if the queue length is equal to the capacity before fetching the element;
conclusion
(1) LinkedBlockingQueue is implemented in the form of a single linked list;
(2) LinkedBlockingQueue adopts the lock separation technology of two locks to realize the non-blocking of joining and leaving the queue;
(3) LinkedBlockingQueue is a bounded queue and defaults to a maximum int if no capacity is passed in;
eggs
(1) LinkedBlockingQueue vs. ArrayBlockingQueue?
A) The latter uses a lock to enter and leave the team, resulting in mutual blocking and low efficiency;
B) Two locks are used before entering the team and leaving the team, which does not interfere with each other and has high efficiency;
C) Both are bounded queues. If the queue length is equal and the queue exit speed cannot keep up with the queue entry speed, a large number of threads will be blocked;
D) If the former does not pass in the initial capacity during initialization, the maximum int value will be used. If the queuing speed cannot keep up with the queuing speed, the queue will be extremely long and occupy a lot of memory.
Welcome to pay attention to my public number “Tong Elder brother read source code”, view more source code series articles, with Tong elder brother tour the ocean of source code.