This article is organized as follows, covering synchronous container classes, concurrency tools for operating systems, and the Java development kit (just a brief introduction, followed by source code analysis). What are the synchronization utility classes?

Let’s take a look at the modules involved in Java concurrency, which are provided by the Java Concurrency library.

Synchronous container class

Synchronization containers mainly include two types. One is the thread-safe implementation of containers, such as Vector, Hashtable and Stack. Synchronized locks are added to the methods of these containers, which is thread-safe implementation.

Vector, Hashtable, Stack, etc., are almost never used today because they are not efficient in a multi-threaded environment.

And one is from the Collections. Synchronizedxxx not thread-safe container, use Collections. Synchronized will encapsulate them programming thread safe container, give two examples

  • Collections.synchronizedList

  • Collections.synchronizedMap

You can see these thread-safe implementations in the Collections source code

Why else is Collections called a collection utility class? Collections encapsulates the state of these container classes and synchronizes each synchronization method so that only one thread at a time can access the state of the container.

Each synchronized XXX creates the equivalent of a static inner class.

While synchronous container classes are thread-safe, in some cases additional client-side locking is required to secure compound operations that consist of two or more methods, typically adding if not, represented in pseudocode

if(a == null){
  a = get();
}
Copy the code

For example, it can be used to determine whether a key exists in the Map. If not, it can be added to the Map. These composite operations are thread-safe without client locking, but can behave unexpectedly when multiple threads concurrently modify the container. Take the following code for example

public class TestVector implements Runnable{ static Vector vector = new Vector(); static void addVector(){ for(int i = 0; i < 10000; i++){ vector.add(i); } } static Object getVector(){ int index = vector.size() - 1; return vector.get(index); } static void removeVector(){ int index = vector.size() - 1; vector.remove(index); } @Override public void run() { getVector(); } public static void main(String[] args) { TestVector testVector = new TestVector(); testVector.addVector(); Thread t1 = new Thread(() -> { for(int i = 0; i < vector.size(); i++){ getVector(); }}); Thread t2 = new Thread(() -> { for(int i = 0; i < vector.size(); i++){ removeVector(); }}); t1.start(); t2.start(); }}Copy the code

These methods seem to be fine because Vector is thread-safe. No matter how many threads access Vector, there is no damage to the interior of the Vector. But as a system, there is thread-safe.

There will be a

If thread A calls getVector with that many elements, it gets A number. GetVector just takes the element, it doesn’t remove it from the vector. RemoveVector takes the element and removes it. Since thread timespaces are out of order, and getVector and removeVector are not mutually exclusive, removing a value such as 6666 from the removeVector method removes the 6666 element from the vector. If the getVector method gets 6666, it will throw an out-of-bounds exception. Why an out-of-bounds array exception? Take a look at the source code for vector

If I were to graph it, it would look something like this.

So, from the system level, the above code also needs to be thread-safe, that is, lock on the client side. As long as we make compound operations use a lock, these operations are as atomic as any other individual operations. As shown in the following example

static Object getVector(){ synchronized (vector){ int index = vector.size() - 1; return vector.get(index); } } static void removeVector(){ synchronized (vector) { int index = vector.size() - 1; vector.remove(index); }}Copy the code

The same effect can be achieved by locking.class to ensure atomic operations.

static Object getVector(){ synchronized (TestVector.class){ int index = vector.size() - 1; return vector.get(index); } } static void removeVector(){ synchronized (TestVector.class) { int index = vector.size() - 1; vector.remove(index); }}Copy the code

Between calls to size and get, the length of the Vector may change, which occurs when sorting the Vector as shown below

for(int i = 0; i< vector.size(); i++){ doSomething(vector.get(i)); }Copy the code

The correctness of this iteration is a matter of luck, that is, the Vector is modified between calls to size and get. In a single-threaded environment, this assumption is perfectly valid, but it can cause trouble if other threads modify the Vector concurrently.

We can still avoid this by locking the client

synchronized(vector){ for(int i = 0; i< vector.size(); i++){ doSomething(vector.get(i)); }}Copy the code

This approach guarantees client reliability, but compromises scalability, and locking during traversal is not desirable.

fail-fast

Many collection classes provide a fail-fast mechanism. Because most collections are iterated internally, using synchronous locks in loops is expensive, and Iterator creation is lightweight. Collections fail fast, also known as fail-fast. When they found the container in the iteration process is modified, will throw ConcurrentModificationException, this rapid failure is not a complete processing mechanism, but only capture concurrent error of goodwill.

If you look at a ConcurrentModificationException annotations, you will find that principle of two kinds of ConcurrentModificationException threw, as follows

This exception is caused by multiple threads making changes inside the collection class while traversing the collection, known as the fail-fast mechanism.

The annotation also states another way

This is also a classic problem, using ArrayList as an example. See the code below

public static void main(String[] args) { List<String> list = new ArrayList<>(); for (int i = 0 ; i < 10 ; i++ ) { list.add(i + ""); } Iterator<String> iterator = list.iterator(); int i = 0 ; while(iterator.hasNext()) { if (i == 3) { list.remove(3); } System.out.println(iterator.next()); i ++; }}Copy the code

This code will exception because inside the ArrayList, there are two properties, modCount and expectedModCount, The operations of ArrayList that cause the number of elements in the collection structure, such as remove, will be judged by checkForComodification, as shown below, which is also the reason for the error of this code.

fail-safe

Fail-safe is a security failure mechanism in Java. It indicates that the collection is not accessed directly on the original collection. Instead, the original collection content is copied and traversed over the copied collection. As the iteration is to traverse the copies of the original collection, so in the process of traversing the changes to the original collection and cannot be detected by the iterator, so will not trigger a ConcurrentModificationException. Containers under the java.util.concurrent package are securely failed and can be used in multi-threaded conditions and modified concurrently.

CopyOnWriteArrayList, for example, is a collection of fail-Safe mechanisms that do not raise exceptions, such as the following

List<Integer> integers = new CopyOnWriteArrayList<>();
integers.add(1);
integers.add(2);
integers.add(3);
Iterator<Integer> itr = integers.iterator();
while (itr.hasNext()) {
    Integer a = itr.next();
    integers.remove(a);
}
Copy the code

CopyOnWriteArrayList is a thread-safe variant of ArrayList. All mutable operations in CopyOnWriteArrayList, such as add and set, are done by making a fresh copy of the array.

Concurrency tools in operating systems

When we talk about concurrent containers, we have to mention what concurrent containers between processes/threads are implemented at the operating system level, which is basically the design of data structures. Let’s take a look at concurrency tools at the operating system level

A semaphore

A semaphore is a method proposed by E.W.Dijkstra in 1965 that uses an orthopedic variable to accumulate the number of wakes for later use. In his view, there was a new type of variable called a semaphore. The value of a semaphore can be 0, or any positive number. Zero means no wakes are needed, and any positive number is the number of wakes.

Dijkstra proposed that semaphores have two operations, now usually using down and up (denoted by sleep and wakeup respectively). The down command checks to see if the value is greater than 0. If it is greater than 0, subtract 1; If this value is 0, the process will sleep and the down operation will continue. Checking values, changing variable values, and possible sleep actions are all performed as a single, indivisible atomic action.

The mutex

When you don’t need the counting power of a semaphore, you can use a simple version of the semaphore called mutex. Mutexes have the advantage of keeping them in a shared resource and a piece of code. Because the implementation of mutex is simple and efficient, this makes mutex useful when implementing user-space threading packages.

A mutex is a shared variable that is in one of two states: unlocked or locked. Thus, only one binary bit is needed to represent it, although in general, an integer is usually used to represent it. 0 indicates unlock, all other values indicate lock, and values greater than 1 indicate the number of locks.

Mutex uses two procedures. When a thread (or process) needs to access a critical area, it calls mutex_lock to lock it. If the mutex is currently unlocked (indicating that the critical region is available), the call succeeds and the calling thread is free to enter the critical region.

On the other hand, if the mutex is locked, the calling thread blocks until the thread in the key region completes and calls mutex_UNLOCK. If multiple threads block on a Mutex, a thread is randomly selected and allowed to acquire the lock.

Futexes

As parallelism increases, effective synchronization and locking are important for performance. Spin locks are very effective if the process waits for a short time. However, if the wait time is long, CPU cycles are wasted. If there are many processes, it is more efficient to block the process and let the kernel unblock only when the lock is released. Unfortunately, this approach also leads to another problem: it can work well when processes are contested, but kernel switching costs can be very high when processes are not contested, and it’s even harder to predict the number of lock contention.

An interesting solution is to combine the best of both and come up with a new idea called FUtex, or Fast User Space Mutex. Sounds interesting, doesn’t it?

Futex is a feature in Linux that implements basic locking (much like mutexes) and avoids getting stuck in the kernel, which is very expensive to switch, which can greatly improve performance. Futex consists of two parts: the kernel service and the user library. The kernel service provides a wait queue that allows multiple processes to queue on a lock. They won’t run unless the kernel explicitly unblocks them.

Mutex in Pthreads

Pthreads provides some functionality for synchronizing threads. The most basic mechanism is the use of mutex variables, which can be locked and unlocked to protect each critical region. A thread that wants to enter a critical region first tries to fetch mutex. If the mutex is not locked, the thread can enter immediately and the mutex can be locked automatically, preventing other threads from entering. If the mutex is locked, the calling thread blocks until the MUtex is unlocked. If multiple threads are waiting on the same mutex, when the mutex is unlocked, only one thread can enter and re-lock. These locks are not required and programmers need to use them properly.

Here are the mutex related function calls

As expected, mutex can be created and destroyed by Phread_mutex_init and Pthread_mutex_destroy. Mutex can also be locked by Pthread_mutex_lock, which blocks the caller if the mutex is already locked. There is also a call to Pthread_mutex_trylock to try to lock the thread, and when the mutex is already locked, it returns an error code rather than blocking the caller. This call allows the thread to be busy effectively. Finally, Pthread_mutex_unlock unlocks mutex and releases a waiting thread.

In addition to mutex, Pthreads provides a second synchronization mechanism: condition variables. Mutex does a good job of allowing or blocking access to critical areas. A condition variable allows a thread to block because some condition is not met. Most of the time these two methods are used together. Let’s take a closer look at the relationships between threads, mutex, and condition variables.

Let’s revisit the producer/consumer problem: one thread puts things in a buffer, and another thread takes them out. If the producer finds that there are no empty slots in the buffer to use, the producer thread blocks until another thread is available. Producers use Mutex to perform atomicity checks without interference from other threads. But when the buffer is found to be full, the producer needs a way to block itself and wake up later. That’s what the condition variable does.

Here are some of the most important pThread calls related to condition variables

The above table shows some calls to create and destroy condition variables. The main properties on condition variables are Pthread_cond_wait and Pthread_cond_signal. The former blocks the calling thread until another thread signals (using the latter call). A blocked thread usually needs to wait for a wakeup signal to release resources or perform some other activity. Only then can the blocked thread continue to work. Condition variables allow waiting and blocking atomicity of the process. Pthread_cond_broadcast is used to wake up multiple blocked threads waiting for a signal to wake up.

Note that condition variables (unlike semaphores) do not exist in memory. Note that if a semaphore is passed to a condition variable where no thread is waiting, the signal will be lost

Tube side

In order to write more accurate programs, Brinch Hansen and Hoare propose a more advanced synchronization primitive called monitor. One of the most important features of a pipe is that only one process can be active at any one time. This makes it very easy to implement mutex operations. Pipes are a feature of the programming language, so the compiler knows they are special and can handle calls to them differently than other procedure calls. Normally, when a process calls a program in a pipe, the first few instructions of that program check to see if there are other active processes in the pipe. If so, the calling process will be suspended until the other process leaves the pipe. The calling process can only enter if no active process is using the pipe.

The mutex that enters the pipe is the responsibility of the compiler, but a common practice is to use mutex and binary semaphore. Because the compiler, not the programmer, is doing the work, the chances of an error are much lower. In no case should the programmer who writes the pipe program care about how the compiler handles it. He just needs to know how to convert all critical sections into pipe procedures. There are never two processes executing code in a critical section at the same time.

Even though pipe routines provide an easy way to implement mutex, in our opinion, this is not enough. Because we also need a way to block when the process cannot execute. In a producer-consumer problem, it’s easy to put tests for full and empty buffers in the pipe program, but how does a producer block if it finds the buffer full?

The solution is to introduce condition variables and the related operations Wait and signal. When a piped program finds that it cannot run (for example, if the producer finds that the buffer is full), it performs a wait operation on a condition variable, such as full. This action causes the calling process to block and also calls in another process that was waiting outside the pipe. We discussed the implementation details of condition variables in the previous pThread article. Another process, such as a consumer, can wake up a blocked calling process by executing signal.

With the automatic mutual exclusion of critical sections, it is easier for a tube to ensure the correctness of parallel programming than a semaphore. But a pipe has its drawbacks. We mentioned earlier that a pipe is a programming language, and the compiler must recognize a pipe and guarantee its mutual exclusion in some way. C, Pascal, and most other programming languages have no pipe programs, so you can’t rely on the compiler to follow the mutex rule.

Another problem related to pipe and semaphore is that these mechanisms are designed to solve the problem of mutual exclusion on one or more cpus accessing shared memory. You can avoid contention by placing semaphores in shared memory and protecting them with TSL or XCHG instructions. But in distributed systems, where there may be multiple cpus, each with its own private memory, connected over a network, these primitives will be invalidated. Because semaphores are too low-level and pipe procedures cannot be used outside of a few programming languages, other methods are needed.

The messaging

The other method mentioned above is messaage passing. This approach to interprocess communication uses two primitives, Send and receive, which act like semaphores rather than tubes, and are system calls rather than language levels. The sample is as follows

send(destination, &message);

receive(source, &message);
Copy the code

The send method is used to send a message to a given target and receive to receive a message from a given source. If there is no message, the recipient may be blocked until a message is received or returned with an error code.

Messaging systems now face many problems and design difficulties that are not covered by semaphores and pipes, especially those that communicate on different machines in the network. For example, messages may be lost by the network. To prevent message loss, senders and recipients can agree that the recipient will send a special acknowledgement message back as soon as the message is received. If the sender does not receive confirmation within a certain period of time, the message is resend.

Now consider the case where the message itself was received correctly, but the acknowledgement message returned to the sender was lost. The sender resends the message so that the recipient receives the same message twice.

It is important for the receiver to distinguish between a new message and an old message that has been resent. This problem is usually solved by embedding a sequential sequence number in each original message. If a recipient receives a message with the same sequence number as a previous message, it knows that the message is duplicate and can be ignored.

The messaging system must also deal with how to name processes so that they are clearly identified in send or receive calls. Authentication is also a problem, such as how the client knows it is communicating with a real file server, and information from sender to receiver can be tampered with by middlemen.

barrier

The last synchronization mechanism is intended for producer-consumer situations in groups of processes rather than between processes. In some applications, phases are divided and no process can proceed to the next phase unless all processes are ready to proceed to the next phase, which can be achieved by installing a barrier at the end of each phase. When a process reaches the barrier, it is blocked by the barrier until all the barriers are reached. A barrier can be used to synchronize a group of processes, as shown in the figure below

In the figure above, we can see that there are four processes approaching the barrier, which means that each process is performing operations but has not yet reached the end of each phase. After some time, processes A, B, and D all reach the barrier and their respective processes are suspended, but they cannot proceed to the next phase because process B has not finished executing. As a result, when the last C reaches the barrier, the process group can proceed to the next phase.

Avoid locking: read-copy-update

The fastest lock is no lock at all. The question is whether we allow access to concurrent reads and writes of shared data structures without locking. The answer, of course, is no. Suppose process A is sorting an array of numbers and process B is averaging it, and if you move A, B will read duplicate values many times, some of which are never encountered at all.

However, in some cases, we can allow write operations to update data structures, even if other processes are using them. The trick is to make sure that each read reads either the old version or the new version, such as the tree below

In the tree above, the read operation traverses the tree from root to leaf. To do this, we add a new node X and make it “exactly right” before it becomes visible in the tree: we initialize all the values in node X, including its child Pointers. X is then called A child of A by atomic write operations. None of the read operations will read inconsistent versions

In the figure above, we then remove B and D. First, the pointer to the left child of A points to C. All reads originally in A will be read to node C and never to node B or D. That is, they will only read the new version of the data. Similarly, all current reads in B and D continue to follow the original data structure pointer and read the old version of the data. Everything works correctly and we don’t need to lock anything. The main reason for removing B and D without locking data is ready-copy-update (RCU), which separates removal from redistribution in the Update process.

Java Concurrency Toolkit

JDK 1.5 provides a number of concurrent containers to improve the performance of synchronous containers, which serialize all access to container state to achieve thread-safety between them. This approach comes at the cost of severely reducing concurrency performance and throughput as multiple threads compete for container locks.

Let’s take a look at some of the concurrency tools used in Java

Overview of Java concurrency tools

ConcurrentHashMap was added in Java 5.0 to replace the hash based Map container. CopyOnWriteArrayList and CopyOnWriteArraySet are added to replace the ArrayList and Set interface implementation classes respectively. There are also two new container types, Queue and BlockingQueue, which stands for Queue and has some implementations of the traditional fifo ConcurrentLinkedQueue and the concurrent PriorityQueue PriorityQueue. A Queue is a first-in, first-out Queue. Operations on it do not block. If the Queue is empty, fetching an element returns a null value. PriorityQueue extends Queue by adding blocking operations such as insert and fetch. If the queue is empty, fetching elements is blocked until one element is available in the queue. If the queue is full, the insert operation blocks until there is space available in the queue.

Java 6.0 also introduced ConcurrentSkipListMap and ConcurrentSkipListSet as concurrent alternatives to synchronized SortedMap and SortedSet, respectively. We will explore this without designing the underlying source code, because the main purpose of this article is to describe what is available and what is used.

ConcurrentHashMap

Let’s take a look at the position of ConcurrentHashMap in the concurrent set

ConcurrentHashMap extends AbstractMap and implements ConcurrentMap and Serializable. AbstractMap and ConcurrentMap are both Map implementation classes. But AbstractMap is an abstract implementation.

ConcurrentHashMap and Hashtable are constructed in a very similar way, except that the Hashtable container is inefficient in hotly contested scenarios because all threads accessing the Hashtable want to acquire the same lock. If there are multiple locks in the container, And each lock is used to lock only one piece of data, so there is no contention when multiple threads access different pieces of data. This is the segmented locking implementation adopted by ConcurreentHashMap. In this locking implementation, any number of reader threads can concurrently access the Map, the read thread and the write thread can concurrently access the Map, and can concurrently modify the Map while reading.

The result of the ConcurrentHashMap piecewise lock implementation is higher throughput in a concurrent environment, with very little performance loss in a single-threaded environment.

Do you know a HashMap is a fail – fast mechanism, that is to say it is a set of strong consistency, in the case of data inconsistency will throw ConcurrentModificationException, And ConcurrentHashMap is a collection of weak consistency, in concurrent change its internal structure, it will not throw ConcurrentModificationException, weak consistency can tolerate concurrent modification.

In HashMap, the size, empty, containsKey and other methods commonly used are standard methods, which return certain results, including is included, not included is not included, and can be used as a judgment condition. The methods in ConcurrentHashMap are only reference methods and are not exact values. Methods such as size and empty are of little use in concurrent scenarios because their return value is always changing, so the need for these operations is weakened.

Locking the Map for exclusive access is not implemented in ConcurrentHashMap. In a thread safe Map implementation Hashtable and Collections. SynchronizedMap for exclusive access to, so only a single thread to modify a Map. ConcurrentHashMap compared with the Map container, has more advantage and less disadvantage, only when the need exclusive access to demand will use the Hashtable or Collections. SynchronizedMap, or other concurrent scenarios, ConcurrentHashMap should be used.

ConcurrentMap

ConcurrentMap is an interface that inherits the Map interface and provides four new methods in the Map interface, all of which are atomic methods that further extend the Map functionality.

Public interface ConcurrentMap<K, V> extends Map<K, V> {// V putIfAbsent(K key, V value) is inserted only when there is no corresponding mapping value for the key. Boolean remove(Object key, Object value); // Remove V replace(K key, V value) only when key is mapped to value; Boolean replace(K key, V oldValue, V newValue); // newValue only if key is mapped to oldValue. }Copy the code

ConcurrentNavigableMap

Java. Util. Concurrent ConcurrentNavigableMap class is Java. Util. NavigableMap subclass, it supports concurrent access, and allow the view of concurrent access.

What is a view? A view is a sequence of data in a collection. ConcurrentNavigableMap supports views returned using headMap, subMap, and tailMap. Instead of reinterpreting all the methods found in NavigableMap, take a look at the methods added in ConcurrentNavigableMap

  • HeadMap method: The headMap method returns a view that is strictly smaller than the given key

  • TailMap method: The tailMap method returns views that contain keys greater than or equal to the given key.

  • SubMap method: The subMap method returns the view given two parameters

The ConcurrentNavigableMap interface contains some additional methods that may be useful

  • descendingKeySet()

  • descendingMap()

  • navigableKeySet()

More descriptions of the methods are not needed here, but you can check out Javadoc for yourself

ConcurrentSkipListMap

ConcurrentSkipListMap is a thread-safe, ordered hash table suitable for high-concurrency scenarios.

The underlying data structure of ConcurrentSkipListMap is implemented based on hop tables. ConcurrentSkipListMap can provide a Comparable internal sort or a Comparator external sort, depending on which constructor is used.

ConcurrentSkipListSet

ConcurrentSkipListSet is a thread-safe, ordered collection suitable for high-concurrency scenarios. The underlying ConcurrentSkipListSet is implemented through the ConcurrentNavigableMap, which is an ordered thread-safe collection.

ConcurrentSkipListSet Ordered, based on the natural ordering of elements or the order determined by a comparator;

ConcurrentSkipListSet is thread-safe;

CopyOnWriteArrayList

CopyOnWriteArrayList is a variation of ArrayList. In CopyOnWriteArrayList, all mutable operations such as add and set are actually recreated by copying the array.

CopyOnWriteArrayList has an internal reference to an array, and the array can’t be modified, so every time you modify CopyOnWriteArrayList concurrently you create a new copy, CopyOnWriteArrayList is a fail – safe mechanism, it will not throw ConcurrentModificationException, elements and returns and create new elements of the same iterator.

Each concurrent write creates a new copy, and this process has some overhead, so in general, when the scale is large and the read operation is much larger than the write operation, CopyOnWriteArrayList is used to ensure thread-safety.

Similarly, CopyOnWriteArraySet replaces the Set interface.

BlockingQueue

BlockingQueue is a new utility class added in JDK 1.5 that extends Queue functionality and inherits Queue functionality.

BlockingQueue waits for the queue to become non-empty when retrieving elements and for the queue to become available when storing elements. The BlockingQueue method has four implementations that are handled in different ways.

  • The first is to throw an exception

  • Special value: The second returns either null or false, depending on the case

  • Block: The third way is to block the current thread indefinitely until the operation becomes available

  • Timeout: The fourth is given a maximum timeout period, after which it will give up

BlockingQueue does not allow null elements to be added. Adding null elements after the methods add, PUT, or offer of its implementation class throws a null-pointer exception. BlockingQueue has a capacity limit. It’s going to have a remainCapacity for any amount of time, and any put element will block until it goes beyond that.

BlockingQueue is typically used to implement producer-consumer queues, as shown in the figure below

BlockingQueue has several implementations, so let’s take a look at these containers.

LinkedBlockingQueue and ArrayBlockingQueue are FIFO first-in, first-out queues, which correspond to LinkedList and ArrayList respectively and have better concurrency performance than synchronous List. PriorityBlockingQueue is a prioritized blocking queue, which is useful if you want to process elements in some order rather than FIFO. Just like any other ordered container, PriorityBlockingQueue can compare elements in natural order or compare external elements using the Comparator. SynchronousQueue maintains a set of threads rather than a set of queues. In fact, it is not a queue, and each insert operation must wait for the remove method of another related thread to execute, and vice versa.

LinkedBlockingQueue

LinkedBlockingQueue is an implementation of BlockingQueue.

It is a first in, first out bounded blocking queue based on linked list construction. The head of the queue is the element that waits the longest in the queue; The tail element of the queue is the element with the shortest wait time in the queue. The new element is inserted at the end of the queue, and the retrieval operation retrieves the head element of the queue. Linked list queues typically have higher throughput than array-based queues, but predictable performance is poor in most concurrent applications.

ArrayBlockingQueue

ArrayBlockingQueue is a bounded queue implemented as an array that sorts elements on a first-in, first-out basis.

A queue that does not guarantee fair access by default. A fair access queue is a blocked thread that can be accessed in the order in which it is blocked, i.e. the queue is accessed first by the blocking thread. Unfairness is unfair to the thread that waits first. It is possible that the thread that blocks first accesses the queue last.

PriorityBlockingQueue

PriorityBlockingQueue PriorityBlockingQueue is a priority-supported blocking queue. By default, elements are in either natural or descending order, or you can specify a Comparator for external sorting. However, it is important to note that the order of elements of the same priority cannot be guaranteed.

DelayQueue

DelayQueue is a non-blocking queue that supports delayed fetching of elements, which can only be used after the delay expires. The queue header in DelayQueue is the element with the longest delay. If there is no delay, there are no head elements and the poll method returns NULL. The reason for this is that the getDelay(timeUnit.nanoseconds) method returns a value less than or equal to zero.

TransferQueue

A TransferQueue inherits from BlockingQueue, which is an interface. A BlockingQueue is an element that a producer might wait for a consumer to accept. A TransferQueue goes further, The producer blocks until an element added to the queue is consumed by a consumer, and the new transfer method is used to implement this constraint.

The TransferQueue has the following methods: two tryTransfer methods, one non-blocking and the other with a timeout parameter to set the timeout time. There are also two assistions hasWaitingConsumer and getWaitingConcusmerCount.

LinkedTransferQueue

An unbounded TransferQueue based on linked lists. This queue performs FIFO sorting on any given producer, with head being the element that has been in the queue for the longest time. Tail is the shortest element in the queue.

BlockingDeque

In contrast to BlockingQueue, BlockingDeque and Deque were introduced in JDK1.6, extending Queue and BlockingQueue respectively.

A Deque is a double-ended queue that implements inserts at the head and tail of the queue respectively. Deque implementations are ArrayDeque, ConcurrentLinkedDeque, and BlockingDeque implementations are LinkedBlockingDeque.

Blocking mode is generally used for producer-consumer queues, while double-endian queues are suitable for working encryption. In the design of work encryption, each consumer has its own double-endian queue. If a consumer completes the task of its own double-endian queue, it will go to the end of the other double-endian queue for consumption. The encryption approach is more scalable than traditional producer-consumer queues because each worker working on the encryption has its own two-ended queue and there is no competition.

ArrayDeque

An ArrayDeque is a dynamically resizable array implementation of a Deque, with no internal capacity limits; they grow as needed. ArrayDeque is not thread-safe and does not support multithreaded access without external locking. Empty elements are prohibited by ArrayDeque, which is faster as a Stack than a Stack and faster as a queue than a LinkedList.

With the exception of remove, removeFirstOccurrence, removeLastOccurrence, contains, interator.remove, most ArrayDeques run with constant overhead.

Note: the ArrayDeque is fail – fast, if you are creating the iterator, but using the iterator external remove modification methods, such as, so this class will throw ConcurrentModificationException.

ConcurrentLinkedDeque

ConcurrentLinkedDeque is an unbounded concurrent queue of bidirectional lists introduced in JDK1.7. It differs from ConcurrentLinkedQueue in that ConcurrentLinkedDeque supports both FIFO and FILO operations, that is, it can operate from both the head and tail of the queue (insert/delete) at the same time. ConcurrentLinkedDeque also supports the happen-before principle. ConcurrentLinkedDeque does not allow empty elements.

LinkedBlockingDeque

LinkedBlockingDeque is a two-way blocking queue made up of a linked list structure that can insert and remove elements from both ends of the queue. The two-way queue has one more entry to the operation queue, reducing the contention in half when multiple threads join the queue at the same time. LinkedBlockingDeque binds the initial capacity to the constructor, which is an effective way to overstretch. The initial capacity, if not specified, is integer.max_value, which is also the default constructor of LinkedBlockingDeque.

Synchronization tool class

The synchronization utility class can be any object that coordinates the control flow of a thread based on its state. Blocking queues can be used as synchronization control classes. Other types of synchronization utility classes include Semaphore, Barrier, and Latch. Let’s take a look at these tool classes

Semaphore

Semaphore translates to Semaphore. What is a Semaphore? It’s essentially a signal, and in operating systems, there’s also the concept of semaphores, and we’ll talk about semaphores communicating when we talk about interprocess communication. In addition, when the Linux operating system interrupts, it also sends interrupt signals to the process and determines whether to terminate the process according to the type of process and signal type.

In Java, Semaphore is used to control the number of threads accessing a particular resource at the same time by coordinating threads to ensure proper use of a common resource.

Semaphore manages a set of permits, the initial number of which is specified by the constructor. Before obtaining a resource, you should obtain permits from the semaphore to ensure that the resource is available. When a thread has finished working on a resource, it places it in the pool and returns a license to the semaphore, allowing other threads to access the resource, which is called releasing the license. If there is no permission, acquire will block until there is permission (break or operation times out). The release method returns a permission semaphore.

Semaphore can be used to implement flow control, such as the common database connection pool. When a thread requests a resource, it blocks if the database connection pool is empty and returns a failure, or unblocks if the pool is not empty.

CountDownLatch

A Latch is a synchronization utility class that delays the progress of a thread until it reaches a terminal state. A lock acts like a door that remains closed until the lock reaches the end state, and no thread can pass through. When the lock reaches the end state, the door opens and allows any thread to pass through, and then remains open.

CountDownLatch is an implementation of locking. It can cause one or more threads to wait for a set of events to occur. A block has a counter that needs to be initialized to indicate the number of times it has to wait. The block waits at the call to await, and other threads call countDown to decrement the block count until it reaches zero and wake up await. See the code below

public class TCountDownLatch { public static void main(String[] args) { CountDownLatch latch = new CountDownLatch(5); Increment increment = new Increment(latch); Decrement decrement = new Decrement(latch); new Thread(increment).start(); new Thread(decrement).start(); try { Thread.sleep(6000); } catch (InterruptedException e) { e.printStackTrace(); } } } class Decrement implements Runnable { CountDownLatch countDownLatch; public Decrement(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } @Override public void run() { try { for(long i = countDownLatch.getCount(); i > 0; i--){ Thread.sleep(1000); System.out.println("countdown"); this.countDownLatch.countDown(); } } catch (InterruptedException e) { e.printStackTrace(); } } } class Increment implements Runnable { CountDownLatch countDownLatch; public Increment(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } @Override public void run() { try { System.out.println("await"); countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Waiter Released"); }}Copy the code

Future

There are two common ways to create multiple threads, one is to inherit Thread class, one is to implement the Runnable interface. Neither method returns a value. In contrast, there are three other ways to create multiple threads using the Callable interface, the Future interface, and the FutureTask class. We’ve talked about Callable before, so we’re not going to describe it here, but we’re going to describe the Future and the FutureTask interface.

A Future is a sequence of actions performed on the result of a specific Runnable or Callable task, which can be obtained if necessary through the get method, which blocks until the execution is complete. The main methods in Future are

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
Copy the code
  • Cancel (Boolean mayInterruptIfRunning) : Attempts to cancel the execution of a task. The attempt fails if the task is completed, cancelled, or cannot be cancelled for some reason. If the cancellation succeeds, or the task has not yet started when Cancel is called, the task will never execute. If the task has already started, the mayInterruptIfRunning parameter determines whether to interrupt execution of the task so that you can try to stop it. This method returns true for subsequent calls to isDone, or isCancelled if cancel returns true.

  • Boolean isCancelled() : returns true if the task isCancelled before normal completion.

  • Boolean isDone() : Returns true if the task is complete.

  • V GET () throws InterruptedException, ExecutionException: Waits for the necessary calculation to complete and then retrieves the result

  • V GET (long timeout, TimeUnit Unit) throws InterruptedException, ExecutionException, TimeoutException: If necessary, wait at most a given amount of time to complete the calculation and then retrieve its results.

Because a Future is just an interface, it cannot be used directly to create objects, hence the following FutureTask.

FutureTask

FutureTask implements the RunnableFuture interface. What is the RunnableFuture interface?

The RunnableFuture interface in turn inherits the Runnable and Future interfaces. What? In Java, only single inheritance is allowed. Yes, single inheritance is more about the inheritance relationship between classes. The subclass inherits the parent class and extends the interface of the parent class. Inheritance between interfaces is an extension of interfaces, which is also proved in Java programming ideas

The RunnableFuture interface is explained as follows: a successfully executed run method completes the Future interface and allows access to its results. So it can either be executed by the thread as a Runnable, or it can be used as a Future to get the return value of a Callable.

FutureTask can also be used as a lock, and it can be in one of three states

  • Waiting for running

  • The running

  • Operation to complete

FutureTask represents asynchronous tasks in the Executor framework, as well as longer computations that can be started before the results are consumed.

FutureTask’s source code will be described in a separate article.

Barrier

We talked above about starting a set of related actions with latches, using latches to wait for a set of events to execute. A lock is a disposable object that cannot be reset once it enters the terminated state.

A Barrier is similar to a lock in that it blocks a group of threads until an event occurs. The difference between a fence and a lock is that all threads must reach the fence at the same time in order to continue execution, as shown in the diagram from our operating system above.

ABCD four threads must reach the Barrier at the same time and walk hand in hand through the palace of happiness.

The await method is called when a thread reaches the location of the Barrier. This method blocks until all threads reach the location of the Barrier. If all threads reach the location of the Barrier, the Barrier will open and all threads will be freed. The Barrier is reset for next use. If calling an await method results in a timeout, or if the thread blocking an await is interrupted, then the Barrier is considered broken and all blocked await objects are thrown a BrokenBarrierException. If, after successfully passing the fence, the await method returns a unique index number, you can use the index number to elect a new leader to do other work.

public class TCyclicBarrier { public static void main(String[] args) { Runnable runnable = () -> System.out.println("Barrier 1 starts...") ); Runnable runnable2 = () -> system.out.println ("Barrier 2 starts...") ); CyclicBarrier barrier1 = new CyclicBarrier(2,runnable); CyclicBarrier barrier2 = new CyclicBarrier(2,runnable2); CyclicBarrierRunnable b1 = new CyclicBarrierRunnable(barrier1,barrier2); CyclicBarrierRunnable b2 = new CyclicBarrierRunnable(barrier1,barrier2); new Thread(b1).start(); new Thread(b2).start(); } } class CyclicBarrierRunnable implements Runnable { CyclicBarrier barrier1; CyclicBarrier barrier2; public CyclicBarrierRunnable(CyclicBarrier barrier1,CyclicBarrier barrier2){ this.barrier1 = barrier1; this.barrier2 = barrier2; } @Override public void run() { try { Thread.sleep(1000); Thread.out.println (thread.currentThread ().getName() + "barrier1"); barrier1.await(); Thread.sleep(1000); Thread.out.println (thread.currentThread ().getName() + "barrier2"); barrier2.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } system.out.println (thread.currentThread ().getName() + "done!" ); }}Copy the code

Exchanger

Associated with the Barrier is the Non-recovery tool class, which is used for collaboration between threads. Used for data exchange between threads.

It provides a synchronization point at which two threads can exchange data with each other. The two threads exchange data using the Exchange method. If the first thread executes the Exchange method first, it will wait until the second thread executes the Exchange. When both threads reach the synchronization point, the two threads can exchange data and pass the data produced by each thread to the other. Therefore, the use of the exchange() method is important for pairs of threads to use, and when one pair reaches the synchronization point, data is exchanged. Thus the thread objects of the utility class are paired.

Here is an example code to explain

public class TExchanger { public static void main(String[] args) { Exchanger exchanger = new Exchanger(); ExchangerRunnable exchangerRunnable = new ExchangerRunnable(exchanger,"A"); ExchangerRunnable exchangerRunnable2 = new ExchangerRunnable(exchanger,"B"); new Thread(exchangerRunnable).start(); new Thread(exchangerRunnable2).start(); } } class ExchangerRunnable implements Runnable { Exchanger exchanger; Object object; public ExchangerRunnable(Exchanger exchanger,Object object){ this.exchanger = exchanger; this.object = object; } @Override public void run() { Object previous = object; try { object = this.exchanger.exchange(object); System.out.println(thread.currentThread ().getName() + "+ previous +" + "+ object); } catch (InterruptedException e) { e.printStackTrace(); }}}Copy the code