BlockingQueue

public interface Queue<E> extends Collection<E> {
   
    boolean add(E e);

    boolean offer(E e);
    
    E remove();

    E poll();

    E element();

    E peek();
}
Copy the code
public interface BlockingQueue<E> extends Queue<E> {
 
    boolean add(E e);

    boolean offer(E e);

    void put(E e) throws InterruptedException;

    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

    E take() throws InterruptedException;

    E poll(long timeout, TimeUnit unit) throws InterruptedException;

    int remainingCapacity();

    boolean remove(Object o);

    public boolean contains(Object o);

    int drainTo(Collection<? super E> c);

    int drainTo(Collection<? super E> c, int maxElements);
}
Copy the code

BlockingQueue is an interface that defines operations such as adding and deleting elements. The implementation classes ArrayBlockingQueue, LinkedBlockingQueue, and others are commonly used to block queues.

  • The producer adds elements to the queue and passesadd/put/offerThe implementation adds elements to the queue. When the queue is full, the thread that added elements blocks the queue until it is available.
  • The consumer retrieves the element in the queue and consumes it, passingremove/take/pollWhen the queue is empty, the thread consuming the element blocks the queue until it is empty.

There are four different ways to add or remove elements:

  • Throws Exception: When the queue is empty, call remove(e) to remove the element Throws an Exception. Calling Add (e) to add elements when the queue is full also throws an exception

  • Return special value (false or NULL) : Call offer(e) to add an element or poll() to delete an element if it cannot be done immediately, returns a special value, usually false or null.

  • Block the current thread until it is woken up: When the queue is empty, the consumer thread calls the take() method and blocks the current thread until it is woken up again when the queue is not empty; Or when the queue is full, the producer thread will block by calling the put(e) method until it is woken up again when the queue is full.

  • A thread calling poll(timeouot,unit) to try to fetch an element blocks when the queue is empty. When the queue is full, the thread calls offer(e,timeout,unit) to add elements and blocks. If the poll and offer are not awakened within a timeout, exit.

The summary is as follows:

The results of Add (Insert) Delete (Remove) Check (Examine)
Throws Exception (Throws Exception) add(e) remove(o) element()
Obstruction (Blocked) put(e) take()
Return Special value offer(e) poll() peek()
Timeout (Times out) offer( e, timeout, unit) poll( timeout, unit)

BlockingQueue Related implementation class

How ArrayBlockingQueue works

The constructor

final Object[] items; // array queue int takeIndex; (take/poll/peek/remove) int putIndex; // Add index (put/offer/add) int count; // Number of elements in the queue final ReentrantLock lock; //ReentrantLock private final Condition notEmpty; Private final condition notFull; Condition public ArrayBlockingQueue(int capacity) {this(capacity, false); } //capacity Indicates the initial capacity of the queue. Public ArrayBlockingQueue(int capacity) public ArrayBlockingQueue(int capacity) public ArrayBlockingQueue(int capacity, int capacity) boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); // Initialize an array of capacity this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }Copy the code

Add elements add/put/offer

// When the queue is full, it blocks the current thread and wakes up when there are free elements; Public void PUT (E E) throws InterruptedException {checkNotNull(E); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); }} // If the queue is full, add elements to the queue and return true; Public Boolean add(E E) {return super.add(E); } // Add element to queue, return true on success; Return false public Boolean offer(E E) {checkNotNull(E); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); Private void enQueue (E x) {final Object[] items = this.items; items[putIndex] = x; If (++putIndex == items. Length) putIndex = 0; if (++putIndex == items. count++; notEmpty.signal(); }Copy the code

AbstractQueue > add;

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
Copy the code

Remove the element remove/take/poll

Public E poll() {final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); }} // If the queue is empty, the current thread is blocked and woken up when there are elements in the queue; Public E take() throws InterruptedException {final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); }} // select the element where takeIndex is placed and return it; If the queue is empty, the block waits for timeout. If there is a new element added to the queue during the waiting time, the thread will wake up and consume the element. If no new element is added to the queue within timeout, the thread will wake up and consume the element. Null public E poll(Long timeout, TimeUnit Unit) throws InterruptedException {Long nanos = unit.tonanos (timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); }} // Remove the element o from the queue, if o exists and is not null, remove it and return true; Public Boolean remove(Object o) {if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); If (count > 0) {final int putIndex = this.putIndex; if (count > 0) {final int putIndex = this.putIndex; int i = takeIndex; Do {// If the target element is found while traversing the queue, delete it and return true if (o.tasks (items[I])) {removeAt(I); return true; } if (++i == items.length) i = 0; // Traverse the queue and I! =putIndex(equal, indicating that the queue has been traversed)} while (I! = putIndex); } return false; } finally { lock.unlock(); Void removeAt(final int removeIndex) {final Object[] items = this.items; If (removeIndex == takeIndex) {items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs ! = null) itrs.elementDequeued(); } else { // final int putIndex = this.putIndex; for (int i = removeIndex;;) { int next = i + 1; if (next == items.length) next = 0; if (next ! Items [I] = items[next]; putIndex = items[next]; i = next; } else {// iterate through the element to the end of the queue, empty the element at the end of the queue, and assign that position to add index putIndex and break out of the loop items[I] = null; this.putIndex = i; break; } } count--; if (itrs ! = null) itrs.removedAt(removeIndex); } notFull.signal(); } private E dequeue() {final Object[] items = this.items; private E dequeue() {final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; If (++takeIndex == items.length) takeIndex = 0; // Number of queue elements minus 1 count--; if (itrs ! = null) itrs.elementDequeued(); notFull.signal(); return x; }Copy the code

Other operations peek/ Element, etc

// Return null if the queue is empty; Public E peek() {final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); } finally { lock.unlock(); } } final E itemAt(int i) { return (E) items[i]; } // AbstractQueue, ArrayBlockingQueue, if the queue is not empty, returns the element at the takeIndex position of the queue head; Public E element() {E x = peek(); if (x ! = null) return x; else throw new NoSuchElementException(); }Copy the code

How does LinkedBlockingQueue work

The constructor

private final int capacity; // Queue size, default is integer. MAX_VALUE private final AtomicInteger count = new AtomicInteger(); // Transient Node<E> head; Private TRANSIENT Node<E> last; Private final ReentrantLock takeLock = new ReentrantLock(); Private final condition notEmpty = takelock.newcondition (); private final condition notEmpty = takelock.newcondition (); Private final ReentrantLock putLock = new ReentrantLock(); Private final condition notFull = putLock.newCondition(); private final condition notFull = putLock.newCondition(); Public LinkedBlockingQueue() {this(integer.max_value); Public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }Copy the code

Static inner class Node

static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; }}Copy the code

Add elements add/put/offer

public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = - 1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; } private void enqueue(Node<E> Node) {last = last.next = Node; }Copy the code

Remove the element remove/take/poll

public boolean remove(Object o) { if (o == null) return false; fullyLock(); try { for (Node<E> trail = head, p = trail.next; p ! = null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } } void fullyLock() { putLock.lock(); takeLock.lock(); } void fullyUnlock() { takeLock.unlock(); putLock.unlock(); } public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } void unlink(Node<E> p, Node<E> trail) { // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. p.item = null; trail.next = p.next; if (last == p) last = trail; if (count.getAndDecrement() == capacity) notFull.signal(); } private E dequeue() {Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }Copy the code

Other operations peek/ Element, etc

Public E peek() {if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); }}Copy the code

ArrayBlockingQueue, LinkedBlockingQueue

Similarities:

  • Both are first-in, first-out (FIFO) queues, in which tasks are executed in the same order as they arrive in the queue.

The difference between:

  • The underlying implementation of ArrayBlockingQueue is an array, and the underlying implementation of LinkedBlockingQueue is a list
  • ArrayBlockingQueue is a bounded queue, LinkedBlockingQueue is an unbounded queue by default (integer.max_value), or you can pass in the number of counts to become a bounded queue.

SynchronousQueue implementation principles

SynchronousQueue is not really a queue, but a mechanism for hand-over between threads. SynchronousQueue avoids queueing tasks and transitions tasks directly from producer to consumer. For a thread (producer thread) to place an element in the SynchronousQueue, another thread (consumer thread) must wait to receive the element.

The use of SynchronousQueue will

Public static void main(String[] args) throws InterruptedException {// Initialize SynchronousQueue. Default is an unfair queue SynchronousQueue<Integer> queue = new SynchronousQueue<>(); PutRunnable PutRunnable = new PutRunnable(queue); TakeRunnable TakeRunnable = new TakeRunnable(queue); new Thread(putRunnable).start(); Thread.sleep(1500); new Thread(takeRunnable).start(); } static class PutRunnable implements Runnable { private SynchronousQueue<Integer> queue; PutRunnable(SynchronousQueue<Integer> queue) { this.queue = queue; } @override public void run() {system.out.println ("++ "); Try {system.out. println("++ producer thread add element: 10"); queue.put(10); } catch (InterruptedException e) { e.printStackTrace(); } finally {system.out.println ("++ producer thread end "); } } } static class TakeRunnable implements Runnable { private SynchronousQueue<Integer> queue; TakeRunnable(SynchronousQueue<Integer> queue) { this.queue = queue; } @override public void run() {system.out.println ("-- the consumer thread starts executing "); Try {system.out.println ("-- consumer thread fetch element: "+ queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } finally {system.out.println ("-- consumer thread end "); }}}Copy the code

Execution Result:

++ Producer thread starts execution ++ Producer thread adds element: 10 -- consumer thread starts execution -- consumer thread takes element out: 10 -- consumer thread ends ++ Producer thread endsCopy the code

As you can see from the result, when the producer starts executing and calls the PUT method, there is no thread to consume (take). Instead of continuing, the producer thread waits for the consumer thread to fetch the element and wake up. Finally, both the producer thread and the consumer thread finish executing and exit.

Sc SynchronousQueue will

SynchronousQueue construction parameters:

public SynchronousQueue() { this(false); } // If true is passed in, all producers and consumers are in order, i.e. the first producer is consumed first; Otherwise, if it is false, there is no order between producers and consumers. public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }Copy the code

Unlike the internal cache queues of ArrayBlockingQueue and LinkedBlockingQueue, SynchronousQueue does not cache data internally; producer threads that perform an add (PUT) must wait for consumer threads to take (take), and vice versa.

To summarize: The add (PUT) and consume (take) threads associated with SynchronousQueue must be paired and continue to execute. If there is only one operation (put or take), that thread blocks and waits until the other thread performs the corresponding operation. SynchronousQueue is suitable for exchanging data between two threads.

How PriorityBlockingQueue works

PriorityBlockingQueue is an unbounded heap-based queue of concurrent security priorities. The elements passed in PriorityBlockingQueue cannot be NULL and the Comparable interface must be implemented.

public static void main(String[] args) throws InterruptedException { //Example2 ArrayList<User> list = new ArrayList<>(); List. add(new User(" 三", 20)); List. add(new User(" li4 ", 40)); List. add(new User(" wang5 ", 30)); List. add(new User(" zhao liu ", 10)); PriorityBlockingQueue<User> PriorityBlockingQueue = new PriorityBlockingQueue<>(); / / add elements There is no order when adding priorityBlockingQueue. AddAll (list); while (priorityBlockingQueue.size() > 0) { User user = priorityBlockingQueue.take(); System. The out. Println (" name: "+ user. GetName () +", the age: "+ user. GetAge () +", queue number of elements: "+ priorityBlockingQueue. The size ()); } } static class User implements Comparable { User(String name, int age) { this.name = name; this.age = age; } String name; int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public int compareTo(Object o) { if (o instanceof User) { User user = (User) o; return age > user.getAge() ? 1:1; } return 0; }}Copy the code

Print result:

Name: Li Si,age:40, number of queue elements :3 Name: Wang Wu,age:30, number of queue elements :2 Name: Zhang SAN,age:20, number of queue elements :1 Name: Zhao Liu,age:10, number of queue elements: 0Copy the code

The elements are passed in no order, but are prioritized by compareTo. The larger the age, the higher the priority. All the final output is sorted by age.

Conclusion:

  • The elements passed into the PriorityBlockingQueue must implement the Comparable interface, which determines priority through its compareTo method, returning a negative number (e.g. -1) if the current element has a higher priority than the compared element, and a positive number (e.g. 1) if the current element has a higher priority than the compared element.
  • PriorityBlockingQueue is locked only on take, but not on PUT, because PriorityBlockingQueue is unbounded and supports concurrent PUT operations. When the queue is empty, the take method blocks the current thread.

reference

【 1 】 docs.oracle.com/javase/8/do… [2] SynchronousQueue will implement principle: zhuanlan.zhihu.com/p/29227508 [3] SynchronousQueue will use instance: segmentfault.com/a/119000001… [5] Unbounded blocking priority queue PriorityBlockingQueue: cmsblogs.com/?p=2407 [5] Unbounded blocking priority queue: ifeve.com/java-priori…