Click “like” to see, form a habit, the public account search [dime technology] pay attention to more original technical articles. This article has been included in GitHub org_Hejianhui /JavaStudy.
preface
LinkedBlockingQueue An optional bounded queue, supported by linked nodes, is an unbounded (theoretically bounded) queue based on a linked list, sorted in first-in, first-out order. LinkedBlockingQueue differs from ArrayBlockingQueue in that it defaults to integer.max_value, which is an unbounded queue, if the size is not specified. Therefore, in order to avoid the situation of machine load or memory overflow caused by the large queue, we recommend manual transmission of a queue size when using.
The queue to create
BlockingQueue blockingQueue = new LinkedBlockingQueue<>();
Copy the code
In the above code, the blockingQueue’s capacity will be set to integer.max_value.
Application scenarios
It is mainly used for task queue. Single thread releases tasks, stops waiting for blocking when the tasks are full, and starts to release tasks when the tasks are completed and consume less.
Let’s look at an example:
package com.niuh.queue.linked;
import org.apache.commons.lang.RandomStringUtils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class TestLinkedBlockingQueue {
private static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>();
// Thread control switch
private final CountDownLatch latch = new CountDownLatch(1);
/ / thread pool
private final ExecutorService pool;
// AtomicLong counts production numbers
private final AtomicLong output = new AtomicLong(0);
AtomicLong counts the number of sales
private final AtomicLong sales = new AtomicLong(0);
// Whether to stop the thread
private final boolean clear;
public TestLinkedBlockingQueue(boolean clear) {
this.pool = Executors.newCachedThreadPool();
this.clear = clear;
}
public void service(a) throws InterruptedException {
Consumer a = new Consumer(queue, sales, latch, clear);
pool.submit(a);
Producer w = new Producer(queue, output, latch);
pool.submit(w);
latch.countDown();
}
public static void main(String[] args) {
TestLinkedBlockingQueue t = new TestLinkedBlockingQueue(false);
try {
t.service();
} catch(InterruptedException e) { e.printStackTrace(); }}}/** * Consumers (selling products) */
class Consumer implements Runnable {
private final LinkedBlockingQueue<String> queue;
private final AtomicLong sales;
private final CountDownLatch latch;
private final boolean clear;
public Consumer(LinkedBlockingQueue<String> queue, AtomicLong sales, CountDownLatch latch, boolean clear) {
this.queue = queue;
this.sales = sales;
this.latch = latch;
this.clear = clear;
}
public void run(a) {
try {
latch.await(); // Wait until you release the brake
for(; ;) { sale(); Thread.sleep(500); }}catch (InterruptedException e) {
if (clear) { // After an interrupt request is received, the thread is terminated after the queue is sold, if required
cleanWarehouse();
} else {
System.out.println("Seller Thread will be interrupted..."); }}}public void sale(a) {
System.out.println("Take take = = =");
try {
String item = queue.poll(50, TimeUnit.MILLISECONDS);
System.out.println(item);
if(item ! =null) {
sales.incrementAndGet(); // You can declare a parameter of type long to get a return value as an argument to the log}}catch(InterruptedException e) { e.printStackTrace(); }}/** * Sell the rest of the queue */
private void cleanWarehouse(a) {
try {
while (queue.size() > 0) { sale(); }}catch (Exception ex) {
System.out.println("Seller Thread will be interrupted..."); }}}/** ** ** */
class Producer implements Runnable {
private LinkedBlockingQueue<String> queue;
private CountDownLatch latch;
private AtomicLong output;
public Producer(a) {}public Producer(LinkedBlockingQueue<String> queue, AtomicLong output, CountDownLatch latch) {
this.queue = queue;
this.latch = latch;
this.output = output;
}
public void run(a) {
try {
latch.await(); // The thread waits
for(; ;) { work(); Thread.sleep(100); }}catch (InterruptedException e) {
System.out.println("Producer thread will be interrupted..."); }}/** * work */
public void work(a) {
try {
String product = RandomStringUtils.randomAscii(3);
boolean success = queue.offer(product, 100, TimeUnit.MILLISECONDS);
if (success) {
output.incrementAndGet();// You can declare a parameter of type long to get a return value as an argument to the log}}catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code
The working principle of
LinkedBlockingQueue is internally implemented by a single linked list that can only fetch elements from head and add elements from tail. Add elements and acquire elements have separate locks, which means that LinkedBlockingQueue is read-write and read-write, and reads and writes can be performed in parallel. LinkedBlockingQueue uses a ReentrantLock to keep threads safe in the event of concurrency.
Any operation that adds an element to an infinite queue will never block, so it can grow to a very large capacity.
The most important thing when designing a producer-consumer model with an unlimited BlockingQueue is that consumers should be able to consume messages as quickly as producers can add messages to the queue. Otherwise, memory might fill up and you get an OutOfMemory exception.
Source code analysis
define
The class inheritance of LinkedBlockingQueue is as follows:It contains methods defined as follows:
Member attribute
/** * Node class for storing data */
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; }}/** The size of the blocking queue. Default is integer.max_value */
private final int capacity;
/** Number of elements in the current blocking queue */
private final AtomicInteger count = new AtomicInteger();
/** * blocks the head node of the queue */
transient Node<E> head;
/** * block the last node of the queue */
private transient Node<E> last;
/** The lock used to get and remove elements, such as take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** notEmpty condition object, used to suspend the thread that performs the deletion when there is no data in the queue */
private final Condition notEmpty = takeLock.newCondition();
/** Locks used to add elements, such as put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** notFull condition object, used to suspend execution of the added thread whenever queue data is full */
private final Condition notFull = putLock.newCondition();
Copy the code
As we know from the above attributes, each data added to the LinkedBlockingQueue queue will be wrapped as a Node Node, with head and last pointing to the head and tail of the queue, respectively. Unlike ArrayBlockingQueue, LinkedBlockingQueue uses takeLock and putLock internally to control concurrency, respectively. That is, add and remove operations are not mutually exclusive and can be performed simultaneously, which can greatly improve throughput.
If you do not specify the size of the queue, use the default integer. MAX_VALUE. If the adding speed is faster than the deleting speed, you may run out of memory.
In addition, LinkedBlockingQueue provides a Condition for each lock lock to suspend and wake up other threads.
The constructor
The default constructor and the last constructor create a queue size of integer.max_value, and only the user of the second constructor can specify the size of the queue. The second constructor finally initializes the last and head nodes so that they both point to a node whose element is null.
The last constructor uses putLock to lock, but it is not locked for multithreaded competition, just for the element to be immediately visible to other threads.
public LinkedBlockingQueue(a) {
// The default size is integer.max_value
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally{ putLock.unlock(); }}Copy the code
The method of entrance
LinkedBlockingQueue provides a variety of implementations for joining the queue to meet the requirements of different situations, including the following:
- Void the put (E, E);
- Boolean offer (E, E);
- Boolean offer(E E, long timeout, TimeUnit unit).
Among them:
- There are two overloaded versions of the offer method, the one-argument version that returns false if the queue is full, or true if it is not queued. Add calls this version of the offer method. Another version with a time parameter, if the queue is full, wait, you can specify the time to wait, if the interval is interrupted, raise an exception, if the wait times out, return true otherwise join the queue;
- The put method has the same logic as the offer method with a time parameter, but there is no time limit on waiting. It waits until a position is available and then inserts into the queue, returning true.
put(E e)
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// Failed to get the lock
putLock.lockInterruptibly();
try {
// Check whether the queue is full. If so, block and wait
while (count.get() == capacity) {
notFull.await();
}
// Put node in the queue
enqueue(node);
c = count.getAndIncrement();
// Again check whether the queue has free space, if there is a wake up next thread to add operation
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
If there is a piece of data in the queue, wake up the consuming thread to consume it
if (c == 0)
signalNotEmpty();
}
Copy the code
In summary, the PUT method takes the following into account:
- Queue full, blocking waiting.
- If the queue is not full, create a node and put it in the queue. If there is enough space in the queue, wake up the next adding thread to add. If there are no elements in the queue before the play, the consuming thread should be woken up after the play.
Let’s look at a few other methods used in the put method, starting with the enqueue(Node Node) method:
private void enqueue(Node<E> node) {
last = last.next = node;
}
Copy the code
Let’s see if we can add elements A and B to the queue as follows:Next we look at signalNotEmpty, with the signalNotFull method in mind.
private void signalNotEmpty(a) {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally{ takeLock.unlock(); }}private void signalNotFull(a) {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally{ putLock.unlock(); }}Copy the code
Why do you write that? The Condition lock of the signal must be obtained.
offer(E e)
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// If the queue has free space, put it into node.
// If so, wake up the next add thread for add operation.
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1< capacity) notFull.signal(); }}finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
Copy the code
As you can see, offer only modifies the PUT method a little bit. Instead of blocking the put method when there are no available elements on the queue, offer directly calls false.
offer(E e, long timeout, TimeUnit unit)
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// Wait for the timeout nanos, return false when the timeout expires
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
Copy the code
This method simply blocks the offer method and uses Condition’s awaitNanos to wait for the timeout. Why use a while loop? Because the awaitNanos method is interruptible, in order to prevent threads from being interrupted during the wait, a while loop is used to handle the interrupt during the wait and continue to wait for the remaining time needed to wait.
A team approach
So with that out of the way, let’s talk about the way to queue. LinkedBlockingQueue provides multiple implementations of queueing operations to meet the requirements of different situations, as follows:
- E take();
- E poll();
- E poll(long timeout, TimeUnit unit);
take()
public E take(a) throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// Queue is empty, block waiting
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
// There are also elements in the queue that wake up the next consuming thread to consume
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// The queue is full before the element is removed. Wake up the production thread to add the element
if (c == capacity)
signalNotFull();
return x;
}
Copy the code
The take method looks like the reverse of the put method. It does the following altogether:
- Queue empty, blocking wait
- The queue is not empty, gets and removes an element from the head of the pair, and wakes up the next consuming thread for element removal if there are still elements in the queue after consumption. If the queue was full before removing the element, the production thread should be woken up to add the element.
Let’s look at the dequeue method
private E dequeue(a) {
// Get the head node
Node<E> h = head;
// Get the next node to which the head node points
Node<E> first = h.next;
// The next of the node that the head node originally points to points to itself, waiting for the next GC collection
h.next = h; // help GC
// Head points to a new node
head = first;
// Get the item value of the new head
E x = first.item;
// Set the item value of the new head to null
first.item = null;
return x;
}
Copy the code
Let’s look at the linked list algorithm with comments and diagrams:Well, this might seem a little convoluted, but we could have written it this way:
private E dequeue(a) {
// Get the head node
Node<E> h = head;
// Get the next node that the head node points to, which is node A
Node<E> first = h.next;
// Get the next node, node B
Node<E> next = first.next;
// Next of the head points to the next node, node B in the graph
h.next = next;
// Get the value of node A
E x = first.item;
first.item = null; // help GC
first.next = first; // help GC
return x;
}
Copy the code
poll()
public E poll(a) {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1) notEmpty.signal(); }}finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
Copy the code
The poll method blocks and waits for the empty element in the take method, but I won’t go into details here. Similarly, poll(long timeout, TimeUnit unit), like offer(E E, long timeout, TimeUnit unit), uses the Condition’s awaitNanos method to block and wait until timeout. I’m not going to list them here.
Method of getting elements
public E peek(a) {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally{ takeLock.unlock(); }}Copy the code
After the lock, the next node of the head node is obtained, and null is returned if it is empty, or item of the next node is returned if it is not empty.
Delete element method
public boolean remove(Object o) {
if (o == null) return false;
// Add two locks
fullyLock();
try {
// Start at head and iterate through the elements until the last element
for(Node<E> trail = head, p = trail.next; p ! =null;
trail = p, p = p.next) {
// If an equal element is found, the unlink method is called to remove the element
if (o.equals(p.item)) {
unlink(p, trail);
return true; }}return false;
} finally {
// Both locks are unlockedfullyUnlock(); }}void fullyLock(a) {
putLock.lock();
takeLock.lock();
}
void fullyUnlock(a) {
takeLock.unlock();
putLock.unlock();
}
Copy the code
Because the remove method uses both locks, all other operations need to wait for it to complete, and this method needs to traverse from the head node to the tail node, so the time complexity is O(n). Let’s look at the unlink method.
void unlink(Node<E> p, Node<E> trail) {
// the p element is set to null
p.item = null;
// Next points to p's next, which removes p from the list
trail.next = p.next;
// If last points to p, delete p and make last point to trail
if (last == p)
last = trail;
// If the element was full before the deletion, there will be space after the deletion, wake up the production thread to insert the element
if (count.getAndDecrement() == capacity)
notFull.signal();
}
Copy the code
The complete code
package java.util.concurrent;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Consumer;
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -6903933977591709194L;
/** * Node class for storing data */
static class Node<E> {
E item;
/** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */Node<E> next; Node(E x) { item = x; }}/** The size of the blocking queue. Default is integer.max_value */
private final int capacity;
/** Number of elements in the current blocking queue */
private final AtomicInteger count = new AtomicInteger();
/** * blocks the head node of the queue */
transient Node<E> head;
/** * block the last node of the queue */
private transient Node<E> last;
/** The lock used to get and remove elements, such as take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** notEmpty condition object, used to suspend the thread that performs the deletion when there is no data in the queue */
private final Condition notEmpty = takeLock.newCondition();
/** Locks used to add elements, such as put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** notFull condition object, used to suspend execution of the added thread whenever queue data is full */
private final Condition notFull = putLock.newCondition();
/** * Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */
private void signalNotEmpty(a) {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally{ takeLock.unlock(); }}/** * Signals a waiting put. Called only from take/poll. */
private void signalNotFull(a) {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally{ putLock.unlock(); }}/**
* Links node at end of queue.
*
* @param node the node
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
/**
* Removes a node from head of queue.
*
* @return the node
*/
private E dequeue(a) {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
// Get the head node
Node<E> h = head;
// Get the next node to which the head node points
Node<E> first = h.next;
// The next of the node that the head node originally points to points to itself, waiting for the next GC collection
h.next = h; // help GC
// Head points to a new node
head = first;
// Get the item value of the new head
E x = first.item;
// Set the item value of the new head to null
first.item = null;
return x;
}
/** * Locks to prevent both puts and takes. */
void fullyLock(a) {
putLock.lock();
takeLock.lock();
}
/** * Unlocks to allow both puts and takes. */
void fullyUnlock(a) {
takeLock.unlock();
putLock.unlock();
}
/ / / * *
// * Tells whether both locks are held by current thread.
/ / * /
// boolean isFullyLocked() {
// return (putLock.isHeldByCurrentThread() &&
// takeLock.isHeldByCurrentThread());
/ /}
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue(a) {
// The default size is integer.max_value
this(Integer.MAX_VALUE);
}
/**
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}, initially containing the elements of the
* given collection,
* added in traversal order of the collection's iterator.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally{ putLock.unlock(); }}// this doc comment is overridden to remove the reference to collections
// greater in size than Integer.MAX_VALUE
/**
* Returns the number of elements in this queue.
*
* @return the number of elements in this queue
*/
public int size(a) {
return count.get();
}
// this doc comment is a modified copy of the inherited doc comment,
// without the reference to unlimited queues.
/**
* Returns the number of additional elements that this queue can ideally
* (in the absence of memory or resource constraints) accept without
* blocking. This is always equal to the initial capacity of this queue
* less the current {@code size} of this queue.
*
* <p>Note that you <em>cannot</em> always tell if an attempt to insert
* an element will succeed by inspecting {@code remainingCapacity}
* because it may be the case that another thread is about to
* insert or remove an element.
*/
public int remainingCapacity(a) {
return capacity - count.get();
}
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc} * /
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// Failed to get the lock
putLock.lockInterruptibly();
try {
/* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */
// Check whether the queue is full. If so, block and wait
while (count.get() == capacity) {
notFull.await();
}
// Put node in the queue
enqueue(node);
c = count.getAndIncrement();
// Again check whether the queue has free space, if there is a wake up next thread to add operation
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
If there is a piece of data in the queue, wake up the consuming thread to consume it
if (c == 0)
signalNotEmpty();
}
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary up to the specified wait time for space to become available.
*
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc} * /
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// Wait for the timeout nanos, return false when the timeout expires
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
/** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, * returning {@code true} upon success and {@code false} if this queue
* is full.
* When using a capacity-restricted queue, this method is generally
* preferable to method {@link BlockingQueue#add add}, which can fail to
* insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// If the queue has free space, put it into node.
// If so, wake up the next add thread for add operation.
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1< capacity) notFull.signal(); }}finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
public E take(a) throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// Queue is empty, block waiting
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
// There are also elements in the queue that wake up the next consuming thread to consume
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// The queue is full before the element is removed. Wake up the production thread to add the element
if (c == capacity)
signalNotFull();
return x;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll(a) {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1) notEmpty.signal(); }}finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E peek(a) {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally{ takeLock.unlock(); }}/** * Unlinks interior Node p with predecessor trail. */
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
// the p element is set to null
p.item = null;
// Next points to p's next, which removes p from the list
trail.next = p.next;
// If last points to p, delete p and make last point to trail
if (last == p)
last = trail;
// If the element was full before the deletion, there will be space after the deletion, wake up the production thread to insert the element
if (count.getAndDecrement() == capacity)
notFull.signal();
}
/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element {@code e} such
* that {@code o.equals(e)}, if this queue contains one or more such
* elements.
* Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
if (o == null) return false;
// Add two locks
fullyLock();
try {
// Start at head and iterate through the elements until the last element
for(Node<E> trail = head, p = trail.next; p ! =null;
trail = p, p = p.next) {
// If an equal element is found, the unlink method is called to remove the element
if (o.equals(p.item)) {
unlink(p, trail);
return true; }}return false;
} finally {
// Both locks are unlockedfullyUnlock(); }}/**
* Returns {@code true} if this queue contains the specified element.
* More formally, returns {@code true} if and only if this queue contains
* at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this queue
* @return {@code true} if this queue contains the specified element
*/
public boolean contains(Object o) {
if (o == null) return false;
fullyLock();
try {
for(Node<E> p = head.next; p ! =null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally{ fullyUnlock(); }}/**
* Returns an array containing all of the elements in this queue, in
* proper sequence.
*
* <p>The returned array will be "safe" in that no references to it are
* maintained by this queue. (In other words, this method must allocate
* a new array). The caller is thus free to modify the returned array.
*
* <p>This method acts as bridge between array-based and collection-based
* APIs.
*
* @return an array containing all of the elements in this queue
*/
public Object[] toArray() {
fullyLock();
try {
int size = count.get();
Object[] a = new Object[size];
int k = 0;
for(Node<E> p = head.next; p ! =null; p = p.next)
a[k++] = p.item;
return a;
} finally{ fullyUnlock(); }}/** * Returns an array containing all of the elements in this queue, in * proper sequence; the runtime type of the returned array is that of * the specified array. If the queue fits in the specified array, it * is returned therein. Otherwise, a new array is allocated with the * runtime type of the specified array and the size of this queue. * * <p>If this queue fits in the specified array with room to spare * (i.e., the array has more elements than this queue), the element in * the array immediately following the end of the queue is set to * {@code null}.
*
* <p>Like the {@link #toArray()} method, this method acts as bridge between
* array-based and collection-based APIs. Further, this method allows
* precise control over the runtime type of the output array, and may,
* under certain circumstances, be used to save allocation costs.
*
* <p>Suppose {@codex} is a queue known to contain only strings. * The following code can be used to dump the queue into a newly * allocated array of {@code String}:
*
* <pre> {@codeString[] y = x.toArray(new String[0]); }</pre> * * Note that {@code toArray(new Object[0])} is identical in function to
* {@code toArray()}.
*
* @param a the array into which the elements of the queue are to
* be stored, if it is big enough; otherwise, a new array of the
* same runtime type is allocated for this purpose
* @return an array containing all of the elements in this queue
* @throwsArrayStoreException if the runtime type of the specified array * is not a supertype of the runtime type of every element in * this queue *@throws NullPointerException if the specified array is null
*/
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
fullyLock();
try {
int size = count.get();
if (a.length < size)
a = (T[])java.lang.reflect.Array.newInstance
(a.getClass().getComponentType(), size);
int k = 0;
for(Node<E> p = head.next; p ! =null; p = p.next)
a[k++] = (T)p.item;
if (a.length > k)
a[k] = null;
return a;
} finally{ fullyUnlock(); }}public String toString(a) {
fullyLock();
try {
Node<E> p = head.next;
if (p == null)
return "[]";
StringBuilder sb = new StringBuilder();
sb.append('[');
for (;;) {
E e = p.item;
sb.append(e == this ? "(this Collection)" : e);
p = p.next;
if (p == null)
return sb.append('] ').toString();
sb.append(', ').append(' '); }}finally{ fullyUnlock(); }}/** * Atomically removes all of the elements from this queue. * The queue will be empty after this call returns. */
public void clear(a) {
fullyLock();
try {
for(Node<E> p, h = head; (p = h.next) ! =null; h = p) {
h.next = h;
p.item = null;
}
head = last;
// assert head.item == null && head.next == null;
if (count.getAndSet(0) == capacity)
notFull.signal();
} finally{ fullyUnlock(); }}/ * * *@throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc} * /
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
/ * * *@throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc} * /
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
boolean signalNotFull = false;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
while (i < n) {
Node<E> p = h.next;
c.add(p.item);
p.item = null;
h.next = h;
h = p;
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
// assert h.item == null;head = h; signalNotFull = (count.getAndAdd(-i) == capacity); }}}finally {
takeLock.unlock();
if(signalNotFull) signalNotFull(); }}/**
* Returns an iterator over the elements in this queue in proper sequence.
* The elements will be returned in order from first (head) to last (tail).
*
* <p>The returned iterator is
* <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
*
* @return an iterator over the elements in this queue in proper sequence
*/
public Iterator<E> iterator(a) {
return new Itr();
}
private class Itr implements Iterator<E> {
/* * Basic weakly-consistent iterator. At all times hold the next * item to hand out so that if hasNext() reports true, we will * still have it to return even if lost race with a take etc. */
private Node<E> current;
private Node<E> lastRet;
private E currentElement;
Itr() {
fullyLock();
try {
current = head.next;
if(current ! =null)
currentElement = current.item;
} finally{ fullyUnlock(); }}public boolean hasNext(a) {
returncurrent ! =null;
}
/** * Returns the next live successor of p, or null if no such. * * Unlike other traversal methods, iterators need to handle both: * - dequeued nodes (p.next == p) * - (possibly multiple) interior removed nodes (p.item == null) */
private Node<E> nextNode(Node<E> p) {
for (;;) {
Node<E> s = p.next;
if (s == p)
return head.next;
if (s == null|| s.item ! =null)
returns; p = s; }}public E next(a) {
fullyLock();
try {
if (current == null)
throw new NoSuchElementException();
E x = currentElement;
lastRet = current;
current = nextNode(current);
currentElement = (current == null)?null : current.item;
return x;
} finally{ fullyUnlock(); }}public void remove(a) {
if (lastRet == null)
throw new IllegalStateException();
fullyLock();
try {
Node<E> node = lastRet;
lastRet = null;
for(Node<E> trail = head, p = trail.next; p ! =null;
trail = p, p = p.next) {
if (p == node) {
unlink(p, trail);
break; }}}finally{ fullyUnlock(); }}}/** A customized variant of Spliterators.IteratorSpliterator */
static final class LBQSpliterator<E> implements Spliterator<E> {
static final int MAX_BATCH = 1 << 25; // max batch array size;
final LinkedBlockingQueue<E> queue;
Node<E> current; // current node; null until initialized
int batch; // batch size for splits
boolean exhausted; // true when no more nodes
long est; // size estimate
LBQSpliterator(LinkedBlockingQueue<E> queue) {
this.queue = queue;
this.est = queue.size();
}
public long estimateSize(a) { return est; }
public Spliterator<E> trySplit(a) {
Node<E> h;
final LinkedBlockingQueue<E> q = this.queue;
int b = batch;
int n = (b <= 0)?1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
if(! exhausted && ((h = current) ! =null|| (h = q.head.next) ! =null) && h.next ! =null) {
Object[] a = new Object[n];
int i = 0;
Node<E> p = current;
q.fullyLock();
try {
if(p ! =null|| (p = q.head.next) ! =null) {
do {
if((a[i] = p.item) ! =null)
++i;
} while((p = p.next) ! =null&& i < n); }}finally {
q.fullyUnlock();
}
if ((current = p) == null) {
est = 0L;
exhausted = true;
}
else if ((est -= i) < 0L)
est = 0L;
if (i > 0) {
batch = i;
return Spliterators.spliterator
(a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT); }}return null;
}
public void forEachRemaining(Consumer<? super E> action) {
if (action == null) throw new NullPointerException();
final LinkedBlockingQueue<E> q = this.queue;
if(! exhausted) { exhausted =true;
Node<E> p = current;
do {
E e = null;
q.fullyLock();
try {
if (p == null)
p = q.head.next;
while(p ! =null) {
e = p.item;
p = p.next;
if(e ! =null)
break; }}finally {
q.fullyUnlock();
}
if(e ! =null)
action.accept(e);
} while(p ! =null); }}public boolean tryAdvance(Consumer<? super E> action) {
if (action == null) throw new NullPointerException();
final LinkedBlockingQueue<E> q = this.queue;
if(! exhausted) { E e =null;
q.fullyLock();
try {
if (current == null)
current = q.head.next;
while(current ! =null) {
e = current.item;
current = current.next;
if(e ! =null)
break; }}finally {
q.fullyUnlock();
}
if (current == null)
exhausted = true;
if(e ! =null) {
action.accept(e);
return true; }}return false;
}
public int characteristics(a) {
returnSpliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT; }}/**
* Returns a {@link Spliterator} over the elements in this queue.
*
* <p>The returned spliterator is
* <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
*
* <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
* {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
*
* @implNote
* The {@code Spliterator} implements {@code trySplit} to permit limited
* parallelism.
*
* @return a {@code Spliterator} over the elements in this queue
* @since1.8 * /
public Spliterator<E> spliterator(a) {
return new LBQSpliterator<E>(this);
}
/**
* Saves this queue to a stream (that is, serializes it).
*
* @param s the stream
* @throws java.io.IOException if an I/O error occurs
* @serialData The capacity is emitted (int), followed by all of
* its elements (each an {@code Object}) in the proper order,
* followed by a null
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
fullyLock();
try {
// Write out any hidden stuff, plus capacity
s.defaultWriteObject();
// Write out all elements in the proper order.
for(Node<E> p = head.next; p ! =null; p = p.next)
s.writeObject(p.item);
// Use trailing null as sentinel
s.writeObject(null);
} finally{ fullyUnlock(); }}/**
* Reconstitutes this queue from a stream (that is, deserializes it).
* @param s the stream
* @throws ClassNotFoundException if the class of a serialized object
* could not be found
* @throws java.io.IOException if an I/O error occurs
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
// Read in capacity, and any hidden stuff
s.defaultReadObject();
count.set(0);
last = head = new Node<E>(null);
// Read in all elements and place in queue
for (;;) {
@SuppressWarnings("unchecked")
E item = (E)s.readObject();
if (item == null)
break; add(item); }}}Copy the code
conclusion
LinkedBlockingQueue is a blocking queue with two internal ReentrantLocks for thread safety in and out of the queue, and await and wake up functions from the respective Condition objects’ await and signal. It differs from ArrayBlockingQueue in that:
- Queue sizes vary. ArrayBlockingQueue is bounded and must be specified for initialization, while LinkedBlockingQueue can be bounded or unbounded (integer.max_value). In the latter case, when the adding speed is greater than the removing speed, In unbounded cases, problems such as memory overflow may occur.
- Unlike data storage containers, ArrayBlockingQueue uses arrays as data storage containers, while LinkedBlockingQueue uses Nodes as linked lists.
- Because ArrayBlockingQueue is a storage container for arrays, no additional object instances are created or destroyed when elements are inserted or deleted, whereas LinkedBlockingQueue generates an additional Node object. This can have a significant impact on GC when large volumes of data need to be processed efficiently and concurrently over long periods of time.
- ArrayBlockingQueue uses the same ReenterLock lock as ArrayBlockingQueue. LinkedBlockingQueue uses the same ReenterLock as ArrayBlockingQueue. PutLock is used for adding and takeLock is used for removing, which can greatly improve the throughput of the queue. It also means that in the case of high concurrency, producers and consumers can operate data in the queue in parallel, so as to improve the concurrency performance of the whole queue.
PS: The above code is submitted to Github: github.com/Niuh-Study/…
GitHub Org_Hejianhui /JavaStudy GitHub Hejianhui /JavaStudy