Concurrent container
Synchronize containers and precautions
Java containers can be divided into four main categories: List, Map, Set, and Queue, but not all Java containers are thread-safe. For example, ArrayList and HashMap are not thread-safe. Before introducing thread-safe containers, let’s consider the following question: How do you turn a non-thread-safe container into a thread-safe container?
The idea is simple: encapsulate a non-thread-safe container inside the object and control the access path.
Let’s take an ArrayList as an example of how to make it thread-safe. In the code below, SafeArrayList holds an instance of ArrayList c, and we add the synchronized keyword to all of the methods that access C. Note that we also add addIfNotExist(). This method also uses synchronized to ensure atomicity.
SafeArrayList<T>{// Encapsulate ArrayList List<T> c = new ArrayList<>(); Synchronized T get(int idx){return c.et (idx); } synchronized void add(int idx, T t) { c.add(idx, t); } synchronized boolean addIfNotExist(T t){ if(! c.contains(t)) { c.add(t); return true; } return false; }}Copy the code
Looking at this, you might be tempted to draw a parallel and wonder: Could all non-thread-safe classes be thread-safe in this wrapper? Not only did you think of this, but the Java SDK developers did too, so they provided a complete set of wrapper classes in the Collections class, as in the following example code, Wrap ArrayList, HashSet, and HashMap into thread-safe List, Set, and Map, respectively.
List list = Collections.
synchronizedList(new ArrayList());
Set set = Collections.
synchronizedSet(new HashSet());
Map map = Collections.
synchronizedMap(new HashMap());
Copy the code
We have emphasized many times that combinative operations need to be aware of race conditions, such as the addIfNotExist() method mentioned above. Combinatorial operations often hide the problem of race conditions. Even if every operation can guarantee atomicity, it cannot guarantee atomicity of combinatorial operations. This must be noted.
An overlooked pitfall in the container world is iterating through a container, such as in the code below, iterating through a list of containers, calling foo() on each element. There are concurrency issues, and these combined operations are not atomic.
List list = Collections. synchronizedList(new ArrayList()); Iterator i = list.iterator(); while (i.hasNext()) foo(i.next()); Instead, the correct way to do this is to lock the list and then iterate. If you look at the source code for the wrapper class inside Collections, you’ll see that the public method of the wrapper class locks the object’s this, which is actually our list here, so locking the list is definitely thread-safe.
List list = Collections. synchronizedList(new ArrayList()); synchronized (list) { Iterator i = list.iterator(); while (i.hasNext()) foo(i.next()); } These wrapped thread-safe containers are based on the synchronized keyword and are also known as synchronized containers. Java also provides Vector, Stack and Hashtable synchronization containers, which are not implemented based on the wrapper class, but are also implemented based on synchronized. The traversal of these three containers should also be locked to ensure mutual exclusion.
Concurrent containers and precautions
The so-called thread-safe container of Java prior to version 1.5 mainly meantSynchronization of the container. However, the biggest problem with synchronized containers is poor performance. All methods use synchronized to ensure mutual exclusion, which is too serial. So Java in 1.5 and later provides a higher-performance container, commonly calledConcurrent container.
1. List
There’s only one implementation class in List which is CopyOnWriteArrayList. CopyOnWrite, as the name implies, will be a new copy of the shared variable when writing, the advantage of this is that the read operation is completely unlocked.
So how does CopyOnWriteArrayList work?
CopyOnWriteArrayList maintains an array, and the member variable Array refers to that internal array. All reads are based on the array, as shown in the figure below. The Iterator iterates through the array.How does CopyOnWriteArrayList handle a write operation, such as adding elements, while traversing an array? CopyOnWriteArrayList makes a copy of the array, adds elements to the newly copied array, and then points the array to the new array. As you can see from the figure below, reads and writes can be done in parallel. Traversals are always performed on the old array, while writes are performed on the new array.
There are two main pitfalls to be aware of when using CopyOnWriteArrayList. One is application scenarios, where CopyOnWriteArrayList is only suitable for scenarios where there are very few writes and it tolerates transient inconsistencies between reads and writes. For example, in the example above, the new element written is not traversed immediately. Another thing to note is that the CopyOnWriteArrayList iterator is read-only and does not support additions, deletions or changes. Because the iterator traverses only a snapshot, it makes no sense to add, delete, or change a snapshot.
2. Map
The two implementations of the Map interface are ConcurrentHashMap and ConcurrentSkipListMap. From an application point of view, the main difference is that the ConcurrentHashMap key is unordered. ConcurrentSkipListMap’s keys are ordered. So if you need to keep keys in order, use ConcurrentSkipListMap.
The important thing about ConcurrentHashMap and ConcurrentSkipListMap is that neither key nor value can be null, otherwise NullPointerException will be thrown at runtime. The table below summarizes the key and value requirements of map-related implementation classes.ConcurrentSkipListMap SkipList is itself a data structure. The average time complexity of hop table insert, delete, and query operations is O(log N), which is theoretically independent of the number of concurrent threads. Therefore, if you are not happy with the performance of ConcurrentHashMap in very high concurrency, try ConcurrentSkipListMap.
3. Set
The two implementations of the Set interface are CopyOnWriteArraySet and ConcurrentSkipListSet. The usage scenario can refer to CopyOnWriteArrayList and ConcurrentSkipListMap. They all work the same way.
4. Queue
Concurrency containers such as Queue are the most complex in Java, and you can classify them in two dimensions. One dimension is blocking and non-blocking. The so-called blocking refers to the queuing operation blocking when the queue is full. The dequeue operation blocks when the queue is empty. The other dimension is single-end and double-end. Single-end means that only the last team can enter the team and the first team can leave the team. And the double end refers to the first team and the end of the team can enter the team out of the team. When sending packets in Java, the Blocking Queue is identified with the Blocking keyword, the single-ended Queue is identified with the Queue, and the double-ended Queue is identified with the Deque.
When these two dimensions are combined, Queue can be subdivided into four categories:
- Single-ended blocking queue: There are ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, LinkedTransferQueue, PriorityBlockingQueue, and DelayQueue. This queue can be an array (whose implementation is ArrayBlockingQueue) or a linked list (whose implementation is LinkedBlockingQueue). It is even possible to hold no queue (its implementation is SynchronousQueue), where producer thread enqueueing must wait for consumer thread enqueueing. The LinkedTransferQueue combines the functions of LinkedBlockingQueue and SynchronousQueue and performs better than LinkedBlockingQueue. PriorityBlockingQueue supports queue removal by priority. DelayQueue supports delayed queuing.
2. A double-ended blocking queueThe implementation is LinkedBlockingDeque.3. Single-ended non-blocking queues: The implementation is ConcurrentLinkedQueue. 4. Two-ended non-blocking queues: The implementation is ConcurrentLinkedDeque.
In addition, when using queues, you need to pay special attention to whether queues support boundedness (by boundedness, you mean whether internal queues have capacity limits). In practice, it is generally not recommended to use unbounded queues, because large data volume can easily lead to OOM. Only ArrayBlockingQueue and LinkedBlockingQueue support bounded queues, so be sure to consider the OOM risk when using other unbounded queues.
Atomic class
An accumulator example, sample code is as follows. In this case, the add10K() method is not thread-safe. The problem is the visibility of the count variable and the atomicity of count+=1. The visibility problem can be solved with volatile, whereas the atomicity problem has always been solved with mutex.
public class Test { long count = 0; void add10K() { int idx = 0; while(idx++ < 10000) { count += 1; }}}Copy the code
In fact, for simple atomicity problems, there is a lock-free solution. After the Java SDK packaged and refined this lock-free scheme, it implemented a series of atomic classes. However, before diving into the implementation of atomic classes, let’s take a look at how atomic classes can be used to solve the accumulator problem, so that you can get a sense of the atomic classes.
In the code below, we replace count +=1 with count.getandincrement (). These two simple changes make the add10K() method thread-safe, and the use of atomic classes is relatively simple.
public class Test { AtomicLong count = new AtomicLong(0); void add10K() { int idx = 0; while(idx++ < 10000) { count.getAndIncrement(); }}}Copy the code
The biggest advantage of a lock-free scheme over a mutex scheme is performance. To ensure mutual exclusion, lock and unlock operations are required, which consume performance. At the same time, the thread that can’t get the lock will also enter the blocking state, which triggers the thread switch, which also has a high performance cost. In contrast, the lock-free scheme has no performance cost of locking and unlocking, while ensuring mutual exclusion, which solves the problem without introducing new problems.
The implementation principle of no lock scheme
The secret to the high performance of atomic classes is simply hardware support. To solve the concurrency problem, the CPU provides the CAS instruction (CAS stands for Compare And Swap). The CAS instruction contains three parameters: A, the memory address of the shared variable, B, the value for comparison, and C, the new value of the shared variable; The value of address A in memory can be updated to the new value C only when the value of address A in memory is equal to that of address B. As a CPU instruction, the CAS instruction itself is atomically guaranteed.
You can understand how CAS works by imitating the CAS directive below. In the following simulation, there are two parameters, one is expected and the other is newValue, the newValue that needs to be written. Count will be updated to newValue only if the current count value is equal to expected expect.
Class SimulatedCAS{int count; Synchronized int cas(int expect, int newValue){synchronized int curValue = count; synchronized int cas(int expect, int newValue){ If (curValue == expect){// If (curValue == expect){// If (curValue == expect){ } // return curValue; }}Copy the code
Think again carefully about the statement, “Count is updated to newValue only if the current value of count is equal to the expected value.” How to understand this sentence?
For the accumulator example mentioned earlier, one of the core problems with count += 1 is: Count +=1 is A+1 based on the current value A of count in memory. When A+1 is written to memory, it is likely that the count in memory has been updated by another thread, which will result in the error of overwriting the value written by another thread (if you are having trouble understanding this). If the value of count in memory is equal to the expected value of A, we can update the value of count in memory to A+1.
Using CAS to solve concurrency problems is usually accompanied by spin, and by spin, it is simply a circular attempt. For example, to implement a thread-safe count+ = 1 operation, the implementation of “CAS+ spin” is as follows. First, calculate newValue = count+1. If CAS (count,newValue) does not return the value of count, It means that the value of count has been updated by another thread after executing code ① and before executing code ②. So what do you do now? You can use a spin scheme, as shown in the code below, to re-read the last value of count to calculate newValue and try to update it again until it succeeds.
class SimulatedCAS{ volatile int count; AddOne (){do {newValue = count+1; / / (1)} while (count! = cas(count,newValue) // Synchronized int cas(int expect, int newValue){synchronized int curValue = count; synchronized int cas(int expect, int newValue){ If (curValue == expect){// If (curValue == expect){// If (curValue == expect){ } // return curValue; }}Copy the code
As you can see from the example above, CAS is a lock-free scheme with no locking or unlocking operations at all. Even if two threads execute the addOne() method at the same time, no thread is blocked, so the performance is much better than the mutex scheme.
However, there is a problem in CAS that you may often overlook, and that is the ABA problem.
If the value returned by cas(count,newValue) is not equal to count, it means that the value of count was updated by another thread after executing code ① and before executing code ②. If cas(count,newValue) returns a value equal to count, can we assume that the value of count has not been updated by another thread? Obviously not. Let’s say that count was originally A, and after executing code ① and before executing code ②, it’s possible that count was updated to B by thread T2, and then updated back to A by thread T3, so that even though thread T1 sees A all the time, But it’s already been updated by another thread, and that’s the ABA problem.
We may not care about ABA in most cases, such as atomic increments, but we may not care about ABA in all cases. For example, an atomized update object may need to care about ABA because the two A’s are equal, but the properties of the second A may have changed. Therefore, it is important to check the CAS scheme before using it.
How does Java implement atomized count += 1
In Java version 1.8, the getAndIncrement() method overlooks the safe.getAndAddLong() method. Here the this and valueOffset arguments uniquely determine the memory address of the shared variable.
final long getAndIncrement() {
return unsafe.getAndAddLong(
this, valueOffset, 1L);
}
Copy the code
Unsafe.getandaddlong () first reads the value of the shared variable in memory, then loops through the compareAndSwapLong() method to attempt to set the shared variable until it succeeds. CompareAndSwapLong () is a native method that updates the value of the shared variable to x and returns true only if the value in memory equals expected; Otherwise return fasle. The only difference between the semantics of compareAndSwapLong and the semantics of the CAS directive is the return value.
public final long getAndAddLong( Object o, long offset, long delta){ long v; V = getLongVolatile(o, offset); } while (! compareAndSwapLong( o, offset, v, v + delta)); return v; // Expected // Return true native Boolean compareAndSwapLong(Object O, long offset, long expected, long x);Copy the code
Also, it’s important to note that the implementation of the getAndAddLong() method is basically the classic example CAS uses. So let’s remind you of the abstracted code snippet that appears in many lockless programs. Java provides atomic classes in which CAS is implemented as compareAndSet(). The only difference between the semantics of compareAndSet() and the semantics of the CAS directive is the return value. Otherwise return false.
Do {// get the current value oldV = XXXX; // Calculate the new value newV =... oldV... }while(! compareAndSet(oldV,newV);Copy the code
An overview of atomic classes
The atomic classes provided with the Java SDK package are very rich and can be divided into five categories: atomized base data types, atomized object reference types, atomized arrays, atomized object property updaters, and atomized accumulators. The methods provided by these five categories are essentially similar, and each category has several atomic classes.
1. Atomized basic data types
Related implementations are AtomicBoolean, AtomicInteger, and AtomicLong, and the main methods provided are the following.
GetAndIncrement () // Atom I ++ getAndDecrement() // Atom I -- incrementAndGet() // Atom ++ I decrementAndGet() // Atom -- I AddAndGet (delta); // Add(delta); Return whether compareAndSet(expect, The new value can be computed by passing in the func function getAndUpdate(func) updateAndGet(func) getAndAccumulate(x,func). accumulateAndGet(x,func)Copy the code
2. Atomized object reference types
AtomicReference, AtomicStampedReference and AtomicMarkableReference can be used to implement atomization update of object reference. The methods provided by AtomicReference are similar to the basic data types of atomization and will not be described here. It is important to note, however, that the update of the object reference needs to focus on THE ABA problem, which is solved by the atomic classes AtomicStampedReference and AtomicMarkableReference.
The train of thought to solve the problem of ABA actually very simple, add A version number dimension is ok, this is A similar and optimistic locking mechanism, each CAS operation, additional to update A version number, just make sure the version number is increasing, so even if A into B then changed back into A, version number, but never come back (increasing the version number). The CAS method implemented by AtomicStampedReference adds the version number parameter and the method signature is as follows:
boolean compareAndSet(
V expectedReference,
V newReference,
int expectedStamp,
int newStamp)
Copy the code
The AtomicMarkableReference implementation mechanism is simpler, reducing the version number to a Boolean value and signing the method as follows:
boolean compareAndSet(
V expectedReference,
V newReference,
boolean expectedMark,
boolean newMark)
Copy the code
3. Atomize arrays
Related implementations are AtomicIntegerArray, AtomicLongArray, and AtomicReferenceArray. With these atomic classes, we can update each element in the array atomically. The only difference between the methods provided by these classes and the basic atomized data types is that each method has an extra array of index parameters, so I won’t go into detail here.
4. Atomize the object property updater
Related implementation AtomicIntegerFieldUpdater, AtomicLongFieldUpdater and AtomicReferenceFieldUpdater, using their atomization to update the object’s properties, the three methods are the use of reflection mechanism to realize, Create an updater as follows:
public static <U>
AtomicXXXFieldUpdater<U>
newUpdater(Class<U> tclass,
String fieldName)
Copy the code
Note that the object attributes must be volatile to ensure visibility; If the object attribute is not volatile, the newUpdater() method throws the runtime exception IllegalArgumentException.
NewUpdater’s () method has only class information, but no object reference. Updating an object attribute requires an object reference. Where is this parameter passed in? Is passed in the atomic operation method parameter. For example, the atomic operation compareAndSet() has one more object reference to obj than the atomic base data type. The method of atomizing the object property updater only has more object reference parameters than the basic data type of atomization, so I won’t repeat it here.
boolean compareAndSet(
T obj,
int expect,
int update)
Copy the code
Atomized accumulator
DoubleAccumulator, DoubleAdder, LongAccumulator, and LongAdder are only used to perform accumulator operations, which are faster than the basic atomized data types, but do not support the compareAndSet() method. If you only need to accumulate operations, it is better to use atomized accumulators.
Executor and thread pools
Creating an object is simply a matter of allocating a chunk of memory in the JVM heap; However, to create a thread, you need to call the API of the operating system kernel, and then the operating system has to allocate a series of resources for the thread, this cost is very high, so the thread is a heavyweight object, should avoid frequent creation and destruction.
Thread pools are different from pooled resources in general. When you need a resource, you call acquire() to acquire it and release() to release it. If you look at this model and package the thread pool-related utility classes, you will find that they do not match at all. There are no methods to apply for and release threads in the thread pool provided by Java.
Class XXXPool{acquire() {} void release(XXX x){}}Copy the code
Thread pooling is a producer-consumer pattern
Why aren’t thread pools designed in the usual way to pool resources? If thread pools were designed to pool resources in a general sense, the following sample code would look like this. So if you think about it, let’s say we get an idle thread T1, what do we do with T1? You might expect a Runnable object to execute specific business logic by calling T1’s execute() method, just as threads are created through the constructor Thread(Runnable target). Unfortunately, there is no public method like execute(Runnable target) for all the methods you use to navigate through a Thread object.
Class ThreadPool{// Acquire Thread acquire() {} // void release(Thread t){} ThreadPool pool; Thread T1=pool.acquire(); / / into a Runnable object T1. Execute (() - > {/ / specific business logic... });Copy the code
Therefore, thread pool design, there is no direct use of the general sense of pooling resources design method. So how do you design thread pools? At present, the design of thread pool in the industry generally adopts the producer-consumer model. The user of the thread pool is the producer and the thread pool itself is the consumer. In the sample code below, we created MyThreadPool, a very simple thread pool that you can use to understand how thread pools work.
BlockingQueue<Runnable> workQueue; workQueue <Runnable> workQueue; Threads = new ArrayList<>(); // MyThreadPool(int poolSize, BlockingQueue<Runnable> workQueue){this.workQueue = workQueue; // create a worker thread for(int idx=0; idx<poolSize; idx++){ WorkerThread work = new WorkerThread(); work.start(); threads.add(work); Void execute(Runnable command){workqueue.put (command); } // The worker thread is responsible for the consumption task, Class WorkerThread extends Thread{public void run() {// Loop through tasks and execute while(true){① Runnable task = workQueue.take(); task.run(); }}}} /** here is an example **/ / create a bounded BlockingQueue BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2); // Create thread pool MyThreadPool pool = new MyThreadPool(10, workQueue); Pool.execute (()->{system.out.println ("hello"); });Copy the code
Inside MyThreadPool, we maintain a blocking queue, workQueue, and a set of worker threads, the number of which is specified by poolSize in the constructor. The user submits a Runnable task by calling the execute() method, and the internal implementation of the execute() method simply adds the task to the workQueue. The worker thread maintained internally in MyThreadPool consumes and executes the tasks in the workQueue, and the associated code is the while loop at code ①. That’s how thread pools work, isn’t it pretty simple?
How do I use thread pools in Java
The thread pool provided in the Java package is far more powerful and certainly more complex than our example code above. The core of Java’s thread pool-related utility classes is ThreadPoolExecutor, which, as you can tell by its name, emphasizes executors rather than pooled resources in general.
The constructor of ThreadPoolExecutor is quite complex, and the most complete constructor takes seven parameters, as shown in the code below.
ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
Copy the code
To explain what these parameters mean, you can think of a thread pool as a project team, and threads are the members of the project team.
- CorePoolSize: Indicates the minimum number of threads held in the thread pool. Some projects are busy, but don’t pull everyone out, at least leave corePoolSize to stand by.
- MaximumPoolSize: indicates the maximum number of threads that can be created in the thread pool. When the project is busy, you need to add people, but you can’t add people indefinitely, up to maximumPoolSize individuals. When the project is idle, it is necessary to remove people, up to corePoolSize person.
- KeepAliveTime & Unit: As mentioned above, projects add and subtract people based on busy time. How do you define busy and idle in the programming world? KeepAliveTime and Unit are used to define the keepAliveTime and unit parameters. That is, if a thread has been idle for so long with keepAliveTime & Unit, and the thread pool is larger than corePoolSize, the idle thread will be reclaimed.
- WorkQueue: a workQueue, synonymous with the workQueue in the example code above.
- ThreadFactory: With this parameter you can customize how threads are created, for example you can give threads meaningful names.
- Handler: This parameter allows you to customize the task rejection policy. If all the threads in the thread pool are busy and the work queue is full (if the work queue is bounded), the thread pool will reject the submitted task. As for the rejection policy, you can specify it with the handler parameter. ThreadPoolExecutor already provides the following four policies.
- CallerRunsPolicy: The thread submitting the task executes the task itself.
- AbortPolicy: will the default refusal strategies, throws RejectedExecutionException.
- DiscardPolicy: Directly discards the task without any exception being thrown.
- DiscardOldestPolicy: Discards the oldest task. DiscardOldestPolicy discards the earliest task and adds the new task to the work queue.
- Java also added the allowCoreThreadTimeOut(Boolean Value) method in version 1.6, which allows all threads to support timeouts, meaning that if the project is idle, the team members will be removed.
What should I pay attention to when using thread pools
Given the complexity of the ThreadPoolExecutor constructor, you can quickly create a thread pool by using the Executors class. However, the current specifications for large factories basically do not recommend the use of Executors, so I won’t spend the space here.
The most important reason not to use Executors is that many of the methods provided by Executors default to use the unbounded LinkedBlockingQueue. Under high load conditions, the unbounded queue can easily cause OOM, which can cause all requests to fail to be processed, which is fatal. So the use of bounded queues is highly recommended.
Use bounded queue, when the task is too much, the thread pool will trigger execution refuses to strategy, the default thread pool refusal strategies will throw RejectedExecutionException a runtime exception, a runtime exception for the compiler is not forced to catch it, so developers can easily ignored. Therefore, use the default reject policy with caution. You are advised to customize your own rejection policy if the task that the thread pool processes is very important. In practice, the custom rejection strategy is often used together with the demotion strategy.
When using thread pools, pay attention to exception handling. For example, when submitting a task through the Execute () method of ThreadPoolExecutor, if a runtime exception occurs during the execution of the task, the thread executing the task will terminate. The most deadly thing, however, is that you don’t get any notification when the task is abnormal, which can lead you to believe that everything is working properly. Although thread pooling provides many methods for exception handling, the safest and simplest solution is to catch all exceptions and handle them as needed, as you can see in the following example code.
Try {// business logic} catch (RuntimeException x) {// on demand} catch (Throwable x) {// on demand}Copy the code