• We’ve seen a lot of blocking queues in the thread pool introduction, but in this article we’ll focus on blocking queues.
  • Blocking the queue is justBlockingQueueThis class is a connection
  • Mouth, while inheritingQueueInterface, both of these interfaces are inJDK5Is added to.
  • BlockingQueueBlocking queues are thread-safe and are frequently used in our business, such as typical producer consumption scenarios, where the producer only needs to add to the queue and the consumer is responsible for fetching from the queue.

  • As shown in the figure above, our producer thread is constantlyputElement to the queue, and the consumer from ittakeThis decouples the task from the executing task class. Tasks are placed in a blocking queue so that producers and consumers do not directly access each other. Isolation improves security.

Concurrent queue

  • The above isJavaIn the queueQueueClass diagram, we can see that it’s divided into two categories,Blocking queues and non-blocking queues
  • The implementation interface for blocking queues isBlockingQueueThe non-blocking queue interface isConcurrentLinkedQueue, this article focuses on blocking queues, not non-blocking queues
  • BlockingQueueThere are six main implementation classes, respectivelyArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue,DelayQueue,PriorityBlockingQueue,LinkedTransferQueue. These blocking queues have their own characteristics and application scenarios, which will be described later.
  • A typical example of a non-blocking queue isConcurrentLinkedQueueInstead of blocking the thread, it uses itCASTo keep the thread safe.
  • There’s actually a queue and thetaQueueIt’s very close, and that isDequeThis is actuallydouble-ended-queueShort for “double endian queue”. It features the ability to add and remove elements from both the header and the tail, as opposed to the usual queueQueueCan only go in one end and out the other, i.eFIFO

Blocking queue characteristics

  • Blocking queues are characterized by blocking. They block threads and balance producers and consumers. There are two key ways to block queuesPutTakemethods

Take method

  • take The getHeaders () method is used to retrieve and remove the headers of a queue, usually when there is data in the queue. But once executed,takeMethod blocks until there is no data in the queue. As soon as there is data in the queue, it unblocks immediately and retrieves the data. The process is shown in the figure:

Put method

  • put Method inserts elements as normal as normal inserts if the queue is not full, but if the queue is full, the insert cannot continue and blocks until there is free space in the queue. If a subsequent queue has free space, such as a consumer consuming an element, then the queue is unblocked and the data that needs to be added is added to the queue. The process is shown in the figure:

Bounded or not (how big is the capacity)

  • In addition, blocking queues have a very important property, that is the size of the capacity, there are two types of bounded and unbounded.
  • An unbounded queue means it can hold a very large number of elements, for exampleLinkedBlockingQueue The ceiling isInteger.MAX_VALUEPhi is about two to the thirty-one, which is a very large number, and we can almost think of it as infinite capacity, because we can barely fill it up.
  • But some blocking queues are bounded, for exampleArrayBlockingQueue If it’s full, it doesn’t expand, so once it’s full, you can’t put any more data in it.

Common method of blocking queues

  • First of all, based on common methods, we can be roughly divided into three categories according to their characteristics, as shown in the following table:
classification methods meaning The characteristics of
An exception is thrown add Add an element If the queue is full, adding throwsIllegalStateExceptionabnormal
remove Delete the queue head node When the queue is empty, deletion is thrownNoSuchElementExceptionabnormal
element Gets the queue header element Throws when the queue is emptyNoSuchElementExceptionabnormal
Return no exception offer Add an element When the queue is full, no exception is reportedfalseReturns on successtrue
poll Get the queue head node and delete it Returns when the queue is emptyNull  
peek Simply get the head node Feedback when the queue is emptyNULL
blocking put Add an element Block if the queue is full
take Return and delete the header element If the queue is empty, it blocks
  • The main eight methods, as shown above, are relatively simple, and we’ll look at them in real code

Throw exception types [add, remove, Element]

add

  • Adds an element to the queue. If the queue is bounded, adding a queue when it is full will raise an exception as follows:
        BlockingQueue queue = new ArrayBlockingQueue(2);
        queue.add(1);
        queue.add(2);
        queue.add(3);
Copy the code
  • In the above code we create a blocking queue with a capacity of 2 when we useaddAdd an element to it, and when added to a third it throws an exception like this:

remove

  • removeRemoves the queue’s head node from the queue and returns the element. Executed when the queue is emptyremoveMethod will throw an exception as follows:
    private static void groupRemove(a) {
        BlockingQueue queue = new ArrayBlockingQueue(2);
        queue.add("i-code.online");
        System.out.println(queue.remove());
        System.out.println(queue.remove());
    }
Copy the code
  • In the code above, we can see that we want to add an element to the queuei-code.onlineAnd then passedremoveMethod is removed when executed a second timeremoveIf there are no more elements in the queue, an exception is thrown. As follows:

element

  • elementThe method is to get the header element of the queue, but not to delete it, which is also the same asremoveWe execute when there are no elements in the queueelementMethod will throw an exception as follows:
    private static void groupElement(a) {
        BlockingQueue queue = new ArrayBlockingQueue(2);
        queue.add("i-code.online");
        System.out.println(queue.element());
        System.out.println(queue.element());
    }
    private static void groupElement2(a) {
        BlockingQueue queue = new ArrayBlockingQueue(2);
        System.out.println(queue.element());
    }
Copy the code
  • The above two methods demonstrate the case with and without elements, respectivelyelementThe use of. The first method does not raise an error because the first element is always present. The second method raises an exception because the first element is empty.

No exception types [offer, poll, peek]

offer

  • offerYou do this by adding elements to the queue, reporting success and failure, and returning on failurefalseThe queue will fail to be added when the queue is full.
    private static void groupOffer(a) {
        BlockingQueue queue = new ArrayBlockingQueue(2);
        System.out.println(queue.offer("i-code.online"));
        System.out.println(queue.offer("Cloud habitat code"));
        System.out.println(queue.offer("AnonyStar"));
    }
Copy the code
  • As shown in the code above, we pass through a queue of size 2offerAdd elements, and when you add a third, you get feedbackfalse, the results are as follows:

true
true
false
Copy the code

poll

  • pollThe method corresponds to the aboveremoveMethod, the difference between the two is whether an exception is thrown without an element,pollMethod does not throw an exception when there are no elements but returnsnull, the following code:
    private static void groupPoll(a) {
        BlockingQueue queue = new ArrayBlockingQueue(2);
        System.out.println(queue.offer("Cloud habitat code")); // Add elements
        System.out.println(queue.poll()); // Get the header element and delete it
        System.out.println(queue.poll());

    }
Copy the code
  • In the above code we create a queue of size 2, add an element, and call it twicepollMethod to get and delete the head node found on the second callnullBecause the queue is empty, as follows:
True The cloud Jane code is nullCopy the code

peek

  • peekMethod is the same as the previous oneelementThe method is corresponding, fetching the element head node but not deleting itpeekMethod does not throw an exception under an empty queue, but returns itnullThat is as follows:
    private static void groupPeek(a) {
        BlockingQueue queue = new ArrayBlockingQueue(2);
        System.out.println(queue.offer(1));
        System.out.println(queue.peek());
        System.out.println(queue.peek());
    }
    private static void groupPeek2(a) {
        BlockingQueue queue = new ArrayBlockingQueue(2);
        System.out.println(queue.peek());
    }
Copy the code
  • As shown in the code above, we show a non-empty queue and an empty queue respectivelypeekThe results are as follows:

Blocking types [PUT, take]

put

  • putThe method is to add an element to the queue, and this method blocks, meaning that when the queue is full, thenputElement blocks until there is a vacancy in the queue.

take

  • takeMethod takes the head node from the queue and removes it. This is also a blocking method. When there are no more elements in the queue,takeMethod blocks until a new element enters the queue.

Common blocking queues

ArrayBlockingQueue

  • ArrayBlockingQueueIs a typical one that we useBounded queue, its internal implementation is based on the array to achieve, we need to specify its length when creating, its thread safety byReentrantLockTo achieve.
public ArrayBlockingQueue(int capacity) {... }public ArrayBlockingQueue(int capacity, boolean fair) {... }Copy the code
  • As shown above,ArrayBlockingQueueIn the provided constructor, we need to specify the length of the queue, and we can also set the size of the queue to be fair. Once we set the size of the queue, we cannot change it.FIFO) to sort elements.
  • andReentrantLock Again, ifArrayBlockingQueue Is set to unfair, then there is the possibility of queue jumping; If set to fair, the thread that has waited the longest is processed first and other threads are not allowed to cut the queue. However, such a fair policy also incurs a performance penalty, since the throughput of the unfair case is usually higher than that of the fair case.

LinkedBlockingQueue

  • As we know from its name, it’s a queue implemented by a linked list, and the length of the queue isInteger.MAX_VALUEThis value is so large that it is almost impossible to reach that we can consider this queue to be basically an unbounded queue (also considered a bounded queue). This queue is sorted in first-in, first-out order.

SynchronousQueue

  • synchronousQueueIs a blocking queue that doesn’t store any elements, every oneputOperations must waittakeAction, otherwise elements cannot be added. It also supports fair and unfair locks.
  • synchronousQueueThe capacity of theta is not 1, it’s 0. Because it doesn’t hold any elements itself, it’s passed directly,synchronousQueueElements are passed directly from producer to consumer without the need for storage in the process
  • Thread pools, which we talked about earlierCachedThreadPoolThat’s the queue.Executors.newCachedThreadPool()Because this thread pool its maximum number of threads is zeroInteger.MAX_VALUEAll threads are temporary threads that are reclaimed after 60 seconds of idle time.

PriorityBlockingQueue

  • PriorityBlockingQueue Is an unbounded blocking queue that supports priority sorting and can be customizedcompareTo() Method to specify the collation of elements, or through a constructor argumentComparator To specify collation rules.However, it is important to note that the objects inserted into the queue must be comparably sized, i.eComparable Otherwise, it will throwClassCastException The exception.
  • It’stake Method blocks when the queue is empty, but because it is an unbounded queue and automatically expands, its queue is never full, so itsput Method never blocks, and the add operation always succeeds

DelayQueue

  • DelayQueueIs an implementationPriorityBlockingQueueDelay-acquired unbounded queue. Has the function of “delay”.
  • DelayQueueApplication scenario: 1. Design of cache system: AvailableDelayQueueHolds the expiration date of cached elements, using a thread loop queryDelayQueue, once can fromDelayQueueWhen an element is retrieved from the 2. Schedule scheduled tasks. useDelayQueueSave the tasks that will be executed that day and the execution time once fromDelayQueueGet the task from, for exampleTimerQueueIs the use ofDelayQueueThe implementation.
  • It’s an unbounded queue, and the elements you put in must be implementedDelayed Interface, andDelayed The interface inherits againComparable Interface, so naturally have the ability to compare and sort, code as follows:
public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}
Copy the code
  • It can be seen thatDelayed Interface inheritanceComparableThere is a method that needs to be implemented, which isgetDelay. Here,getDelay Method returns how much delay is left before the task can be executed, with 0 or negative values indicating that the task has expired.
  • Elements are placed at different positions in the queue depending on the length of the delay, with the closer they are to the queue head, the earlier they expire.

This article was published by AnonyStar. It may be reproduced but the original source must be claimed. Welcome to pay attention to wechat public account: cloud habitat Jane code to obtain more quality articles more articles to pay attention to the author’s blog: cloud habitat Jane code i-code.online