General documentation: Article directory Github: github.com/black-ant

Such as the problem, the blocking queue really did not have what to say, the tool class, it is ok to understand the function feeling

A brief description of the blocking queue

BlockingQueue is a queue that supports two additional operations. The two additional operations are:

  • When the queue is empty, the thread that gets the element waits for the queue to become non-empty.
  • When the queue is full, the thread storing the elements waits for the queue to become available.

Blocking queues are commonly used in producer and consumer scenarios:

  • The producer is the thread that adds elements to the queue, and the consumer is the thread that takes elements from the queue
  • A blocking queue is a container where the producer holds elements and the consumer only takes elements from the container.

The general concept of

  • Implements Queue: This is basically a Queue implementation class, which implements a method of Queue
  • Capacity and sorting can be initialized using constructors
  • Constructors can be passed into the entire collection

/ / the queue• ArrayBlockingQueue: A bounded blocking queue consisting of an array structure. • LinkedBlockingQueue: An unbounded blocking queue consisting of a linked list structure. • PriorityBlockingQueue: An unbounded blocking queue that supports priority ordering. • DelayQueue: An unbounded blocking queue implemented using a priority queue. • SynchronousQueue: a blocking queue that does not store elements. • LinkedTransferQueue: An unbounded blocking queue consisting of a linked list structure. • LinkedBlockingDeque: A two-way blocking queue consisting of a linked list structure.// Classify by type• Lockless, non-blocking concurrent queues: ConcurrentLinkedQueue and ConcurrentLinkedDeque Array-based BlockingQueue, LinkedBlockingQueue and LinkedBlockingDeque DelayQueue • Other blocking queues: Add (e) Offer (e) put(e) Offer (e, time, Remove () poll() take() poll(time, Check element() peek() is unavailable > ArrayBlockingQueue and LinkedBlockingQueue// Performance comparison
1ArrayBlockingQueue performs better than LinkedBlockingQueue, but LinkedBlockingQueue is unbounded.2The poll methods of ArrayBlockingQueue, LinkedBlockingQueue, and LinkedBlockingQueue are always faster than the offer methods. The higher the concurrency, the bigger the gap3ArrayBlockingQueue, LinkedBlockingQueue and ArrayBlockingQueue perform much better than PriorityBlockingQueue. Obviously, priority queues are too expensive to compare priorities4The performance of PriorityBlockingQueue is close to that of poll1:1    

Copy the code
The number of threads 20 50 100 200 500 1000
LinkedBlockingQueue 15, 0 31, 32 and 16th Aiaa 63 203,47 563110
ArrayBlockingQueue 15, 0 16, 15 31, 47 16th 125,47 364 of
PriorityBlockingQueue 78,78 172188 360422 813969 3094,2641 6547,5453

2. ArrayBlockingQueue

A bounded blocking queue implemented by an array. The added > ArrayBlockingQueue is bounded and fixed, and its size is determined by the constructor during construction. ArrayBlockingQueue supports an optional fairness policy for sorting the waiting producer and consumer threads - but by default it does not guarantee fair access to threads and can be selected at construction time (fair =)true). - Fairness generally reduces throughput, but reduces variability and avoids "imbalance".// Constructor:
MC- ArrayBlockingQueue(int capacity)
MC- ArrayBlockingQueue(int capacity, boolean fair)

// Abstract classes and interfacesI-blockingqueue <E> -blockingQueue <E> -blockingQueue <E> - Internally use ReentrantLock + Condition to complete concurrent operations in multi-threaded environments/ / variable• Items variable, a fixed-length array that maintains the elements of ArrayBlockingQueue. • takeIndex variable,intArrayBlockingQueue. • putIndex variable,int, ArrayBlockingQueue. • Count variable, number of elements. • The lock variable, ReentrantLock, and ArrayBlockingQueue must be acquired for both exit and entry. Both steps share the same lock. • the notEmpty variable, which is notEmpty, is a column condition. • notFull variable, less than, that is the entry condition./ / teamThe M-add (E E) method: inserts the specified element to the end of this queue (if it is immediately available and does not exceed the capacity of the queue) and returns on successtrueM- Offer (E E) method: Inserts the specified element at the end of this queue (if it is immediately available and does not exceed the capacity of the queue) and returns on successtrue, full returnfalse
M- offer(E e, longTimeout, TimeUnit unit) method: insert the specified element into the end of the queue, if the queue is full, wait for the space available m-enqueue: - Added elements normally, redirected to the head of the queue when reaching the end - total +1- Notifies the blocking thread/ / dequeueThe m-poll () method retrives and removes the head of this queue, or returns if the queue is emptynull. M- poll(longTimeout, TimeUnit unit) method: get and remove the head of this queue and wait for the available elements (if necessary) until the specified wait time. The m-take () method: fetch and remove the head of this queue, waiting until the element becomes available (if necessary). The m-remove (Object O) method: removes a single instance of the specified element (if any) from this queue.// Core summary:M-offer: Locks through ReentrantLock -final ReentrantLock lock = this.lock;
	- lock.lock();
	- finally ->  lock.unlock();

// Key points:
1Once created, the capacity cannot be changed2Trying to put an element into a full queue will cause the operation to block3Attempts to retrieve an element from an empty queue are similarly blocked4Optional fairness policies are supportedCopy the code

3. DelayQueue

An unbounded blocking queue that supports delayed fetching of elements. The elements inside are all "extendable" elements, and the elements at the top of the column are the first to expire - if there are no elements in the queue that are due, you cannot retrieve the elements from the column header, even if there are elements. - That is, elements can be fetched from the queue only during the delay period./ / function:• Cache: Clears the cache data that times out. • The task times out/ / key:
1.ReentrantLock2.Condition object for blocking and advising3.PriorityQueue ordered by Delay time: PriorityQueue4.The thread element leader is used to optimize the blocking notification/ / structure:E- AbstractQueue I- BlockingQueue M- offer() : Add element to PriorityQueue - Insert element to PriorityQueue - determine if the current element is the first element, and if so set leader=nullM- take() - get the queue head -- q.prink IF- get the queue head empty, block, wait for off wake ELSE- get the queue head timeout, expired, then pair - IF there is another thread operation, block, no other thread, Exclusive - Timeout blocking -- available. AwaitNanos (delay); - Wake up the blocking thread// How to use:
// Step 1: new one
DelayQueue queue = new DelayQueue();

// Step 2: Add something
queue.offer(createUserDelayQueueTO());


Copy the code

4. SynchronousQueue will

  1. The SynchronousQueue has no capacity. – Unlike other BlockingQueues, SynchronousQueue is a BlockingQueue that does not store elements. Every PUT operation must wait for a take operation or it cannot continue adding elements, and vice versa.
  2. Peek, contains, Clear, isEmpty… And so on method actually is invalid. – For example, clear does not perform any operations, contains always returns false, and peek always returns null.
  3. SynchronousQueue can be either fair or unfair. By default, the non-fair policy is adopted. It can also be set to fair (true) by the constructor.
  4. With TransferQueue, there will always be a dummy Node in the queue (more on this later).

SynchronousQueue is ideal for interchange, where the producer thread and the consumer thread synchronize to pass certain information, events, or tasks.

C- SynchronousQueue E- AbstractQueue I- BlockingQueue C- TransferQueue ? - The core class that implements fairness policies. Its node is QNodeCopy the code

5. LinkedBlockingDeque

  • A two-way blocking queue consisting of a linked list that supports the insertion and removal of elements from both ends compared to the previous blocking queue.
    • Those ending in first indicate that they operate from the head, and those ending in last indicate that they operate from the tail.
  • Supports FIFO and FILO operation modes

LinkedBlockingQueue is a blocking queue

  • Internally, two reentrantLocks implement thread-safe access to the queue and await and signal from the respective Condition objects.
    • An arbitrary (actually bounded) FIFO blocking queue based on a unidirectional linked list.
    • The head and tail nodes always start with a sentinel node, which does not hold actual data
      • When there is data in the queue, the head node still points to the sentinel, and the tail node points to the last node with valid data.
      • The advantage of this is that, when combined with the count counter, access to the head and tail of the queue can be done independently without the need to determine the relationship between the head and tail nodes.


/ / profileFIFO is a first-in, first-out queue. ReentrantLock is used to ensure thread safety// Operation resultAdd: queue full > put -> always block add -> throw exception offer -> returnfalseRemove -> NoSuchElementException poll -> ReturnsfalseTake - > block// Source code analysis
LinkedBlockingQueue
    C- static class Node<E> : core, a static inner class | - represents a nodeE itemOriginal | - : nodeNode<E> next: Next nodeF- int capacity: Capacity limitF- AtomicInteger count: Indicates the number of current elementsF- Node<E> head: the head nodeF- Node<E> lastTail: nodeF- ReentrantLock takeLock : take.pollSuch as acquiring a lockF- Condition notEmpty: Queue of waiting tasksF- ReentrantLock putLock : put.offerSuch as in the lockF- Condition notFull: Waiting queue for insertionMC- LinkedBlockingQueue() : maximum quantityMC- LinkedBlockingQueue(int capacity) : Specifies the quantityMC- LinkedBlockingQueue(Collection<? extends E> c) : Specifies the collectionM- signalNotEmpty: indicates waitingtake.put/offerOtherwise it is not normally lockedtakeLock| - gettackLock : this.takeLock| - locktakeLock ->  takeLock.lock(a); | - wake up take a thread waiting queue - > notEmpty. The signal (); | - releases the lock - > takeLock. Unlock (); M - signalNotFull: said wait for put, take/poll called | - get putLock:this.putLock; | - lock putLock - > putLock. The lock (); | - awakens the insert thread waiting queue - > notFull. The signal (); | - releases the lock M - the enqueue: insert at the end of the queue | - last = last. Next = node; M-E dequeue(a): remove queue head | - keep the head pointer | - for current list for the first element of the pointer to the first element | | - head - to obtain the value of the first element and removing the first | - returns the value of the first element of M - fullyLock: Locking putLock and takeLock | - putLock.lock(a); |- takeLock.lock(); M - fullyUnlock: unlock takeLock first, and then unlock putLock | - putLock. Unlock (); M-offer: Sets the given element to the queue and returns if the setting is successfultrue| - judging is not empty, access to counter | - whether the queue is full - > return Boolean | - | - access to new node is inserted, And lock | - queue counts under - > insert - > | - if not full, continue to awaken the inserting thread | - unlock | - if the right is empty, the thread lock block M - offer (E, E,longA timeout, TimeUnit unit) : within a given time is set to the queue in the M - put (E, E) : set the element to the queue, if there is no extra space in the queue, this method will always be blocked, until the queue to have extra space | - core1: putLock.lockInterruptibly(); - > Settings before locking | - core2: notFull.await(); -> wait when the queue is full m-take () : gets the value from the queue, if there is no value in the queue m-peek () : non-blocking gets the first element in the queue, and does not leave the queue m-poll () : non-blocking gets the value from the queue, and does not get the value backnull. M- poll(longTimeout, TimeUnit Unit): obtains the value from the queue in the given time. M-remove (Object O): removes the specified value from the queue. Lock both locks. M-clear (): clears the queue. M-drainto (Collection C): Removes all values from the queue and concurrently sets them to the given Collection.Copy the code

6. LinkedTransferQueue

  • LinkedTransferQueue is an unbounded blocking queue made up of linked lists
  • It is a superset of ConcurrentLinkedQueue, SynchronousQueue (in fair mode), unbounded LinkedBlockingQueues, and so on.

In addition to the other BlockingQueues, he implements an interface called TransferQueue, which comples BlockingQueue by adding tryTranfer() and transfer() methods:

  • Tranfer () : If there is a pending consumer thread, hand it over immediately.
    • Otherwise, the current element E is inserted to the end of the queue and waits until the element is blocked until a consumer thread picks it up
  • TryTranfer () : If there is currently a consumer thread waiting to be acquired (using the take() or poll() functions), use this method to immediately transfer/transfer the object element e;
    • If not, false is returned and no queue is entered. This is a non-blocking operation

Seven. PriorityBlockingQueue

- PriorityBlockingQueue is an unbounded queue that supports priority. - By default, the elements are sorted in natural order. You can also use a custom Comparator to specify the sort order. - PriorityBlockingQueue internal binary heap implementation, the whole process is not particularly complex. - Add operation is constantly "up", and delete operation is constantly "down".Copy the code

ArrayBlockingQueue and LinkedBlockingQueue

Queue Blocking or not If there is a world Thread safety Applicable scenario Matters needing attention
ArrayBlockingQueue blocking bounded A global lock Production – consumption models balance both sides of the processing speed The storage space used to store the queue elements is pre-allocated and has a low memory overhead (no need to dynamically apply for storage space)
LinkedBlockingQueue blocking configurable Access using two locks Production – consumption models balance both sides of the processing speed Be aware of the memory overflow problem when unbounded. The storage space used to store queue elements is dynamically allocated as they are used, so it can add to the JVM’s garbage collection burden.

9. Dual-end queue

ArrayDeque and LinkedBlockingDeque are double-endian queues with class names ending in Deque

Just as blocking queues work with the producer-consumer pattern, dual-ended queues work with another pattern, working cipher.

  • In a producer-consumer design, all consumers share a work queue, while in a work cipher, each consumer has its own two-end queue.
  • If a consumer completes all the work in its own two-endian queue, it can secretly fetch work from the end of another consumer’s two-endian queue.
    • Better scalability because worker threads do not compete on a single shared task queue.
  • Most of the time, they only access their own two-enqueue, which greatly reduces competition.
  • When a worker thread needs to access another queue, it gets work from the end of the queue rather than the head, thus further reducing contention on the queue.

10. Queue objects

> Block queue: Blocking queues have normal fifO queues, > based on array including ArrayBlockingQueue > based on chain table LinkedBlockingQueue/LinkedBlockingDeque > heap-based priority blocking queue PriorityBlockingQueue > DelayQueue > Blocking queues for special scenarios SynchronousQueue and LinkedTransferQueueCopy the code

11. The CopyOnWriteArrayList

The CopyOnWrite container is a container that is copied on write.

When we add elements to a container, we Copy the current container to create a new container, and then add elements to the new container. After adding elements, we reference the original container to the new container.

Advantages and disadvantages of | - advantage: read operation performance is very high, more applicable to read more writing less concurrency scenarios. Java list in traversed, if midway have other threads to modify the list container, will throw ConcurrentModificationException. And CopyOnWriteArrayList because of that"Read-write separation"The idea that traversal and modification operations operate separately on different list containers? - so when using iterators traversal, also won't throw ConcurrentModificationException. | - faults: memory usage problems, perform the write operation happens when the array copy there is no guarantee that real-time, lock the Vector to read and write operations are synchronized, can ensure that the strong consistency of read and write. CopyOnWriteArrayList, because of its implementation policy, writes and reads on different containers, right? - During the write operation, the read is not blocked but the data is read from the old container. | - usage scenarios: CopyOnWrite concurrent container used to read more writing less complicated scenario. Such as whitelist, blacklist, commodity access and update scenarios. CopyOnWriteArrayList F- ReentrantLock lock =newReentrantLock() --> ReentrantLock F-volatileObject[] array; [] getArray() --> getArray --> getArray --> getArray Non-private method for accessing the CopyOnWriteArraySet class m-setarray (Object[] a) --> setArray m-copyonwritearraylist -- create an empty array M- CopyOnWriteArrayList(Collection<? extends E> c) ? - create a collection contains the specified array B - if the c class type for CopyOnWriteArrayList | - direct access to the array E - if not | | - - through the toArray array if c.t oArray return is not the Object [] type, Through the array copy | - set the array: setArray (elements); M-copyonwritearraylist (E[] toCopyIn) : create a list containing copies of the given array m-size () : get the number of copies m-isEmpty () : M-eq (Object O1, Object O2) : determine whether o1 O2 is equal m-indexof (Object o, Object[] elements,int index, intB - for a fence)null , forThe loop iterates over the first onenullE - not fornull ,forCycle eq M - lastIndexOf: index flashbacks M - the contains: IndexOf judgment M - clone: shallow copy | | - reset the lock - M - toArray clone property is returned by the M - get: From the original array element M - set: replace the list with the specified elements specify the location of the element | - gets the current lock and lock | | - get element array - retrieve the old value B - if the old value is equal to the given value is not | - original copy an array, Amend the new index is used in the position to a new value | - will replace the original array is a new array E - otherwise | - setArray (elements); | - returns the old value M - the add (E, E) : Will be appended to the specified element at the end of this list | - acquire reentrant lock, lock | | - access to the original array - copy of the original array and increase a space | - specifies elements added to the new array of new space | - M - remove new array to replace the original array: | - acquiring a lock and lock | - | - get access to the original array element value to be deleted, to obtain the value of B - if it is to move0SetArray (Arrays.copyof (elements, len -)1)); E - or duplicate copy | - | - the original array, new array index index before all of the data, copy to the new group | - the element in the array, the index index +1After numMoved element, copied to the new array, index index after | | - to replace the original array - returns the old value, finally releases the lock C - COWSubList: internal view classCopy the code

Thank you

Taro source code: HTTP://www.iocoder.cn/JUC/sike/aqs-3/

https://mp.weixin.qq.com/s?__biz=MzIxOTI1NTk5Nw==&mid=2650047475&idx=1&sn=4349ee6ac5e882c536238ed1237e5ba2&chksm=8fde2621b8a9a f37df7d0c0a7ef3178d0253abf1e76a682134fce2f2218c93337c7de57835b7&scene=21#wechat_redirect

https://blog.csdn.net/javazejian/article/details/70768369Deadknock series, my multi-threaded tutor HTTP://cmsblogs.com/?cat=151

// JVM source code C
https://www.jianshu.com/p/a604f1a9f875 

Copy the code