- What would you do if you had an integer variable count, and multiple threads incremented count by 1?
- Do you know how to get multiple threads to collaborate on an event?
preface
Nice to meet you
ConcurrentHashMap ConcurrentHashMap ConcurrentHashMap ConcurrentHashMap ConcurrentHashMap ConcurrentHashMap ConcurrentHashMap How does he compare the traditional Hashtable and SynchronizeMap (never heard of SynchronizeMap? He’s Collections. SynchronizeMap method returns the object) is really good?
ConcurrentHashMap implements thread safety on the basis of HashMap. For more information on HashMap, see this article. There is no need to go over the HashMap in this article. Readers who are not familiar with the underlying design of HashMap should read the previous article first. The concurrent programming wisdom of ConcurrentHashMap is well worth learning, as are the two questions you asked at the beginning of this article. How would you solve them? It is possible to lock directly or use a higher performance CAS, but ConcurrentHashMap gives us a different solution.
The main content of this article is to explain the concurrent design in ConcurrentHashMap, focusing on the analysis of the source code of ConcurrentHashMap four methods: putVal, initTable, addCount, Transfer. Each method is analyzed using a diagram that explains the core idea of ConcurrentHashMap. I have added very detailed comments to the source code, and I recommend that you read it when you have time. The concurrent wisdom of ConcurrentHashMap is contained in the source code.
So let’s get started
CAS and spinlocks
CAS is a focus of ConcurrentHashMap and is the foundation for performance improvements in ConcurrentHashMap. In reading the source code, CAS is everywhere. Before introducing ConcurrentHashMap, it is important to introduce these two important points.
Java operations are not atomic operations, such as count++ can be divided into:
- Get the count copy count_
- Increment count_
- Assign count_ to count
If count is modified by another thread after the first step, the assignment in the third step overwrites the change made by the other thread. Synchronize solves this problem, but CAS is a better solution because locking is a heavy-duty operation that severely affects performance.
CAS’s thinking is not complicated. If we need to increment the variable count, we can consider that no concurrency conflict has occurred. We store a copy of the count and increment it. Then we compare the copy with the count itself. If not, it means that count has been modified during our increment process. Repeat the whole process above until the modification succeeds, as shown in the figure below:
So, what if count is changed after we say count==count_? Compare the operation of the assignment and the operating system guarantees atomicity and guarantees that this will not happen. Common CAS methods in Java are:
// Compare and replace
U.compareAndSwapInt();
U.compareAndSwapLong();
U.compareAndSwapObject();
Copy the code
We’ll see a lot of them in subsequent source code. In this way, we don’t need to lock the count variable. However, if the concurrency is too high and the processing time is too long, some threads will keep spinning in a loop, wasting CPU resources.
Spin lock is an application layer lock designed by CAS. The following code:
// 0 indicates that the lock is released, and 1 indicates that the lock was taken by a thread
int lock = 0;
while(true) {if(lock==0) {int lock_ ;
if(U.compareAndSwapInt(this,lock_,0.1)){
... // Get the logic after the lock
// Finally release the lock
lock = 0;
break; }}}Copy the code
This is a classic spin lock design. Determine if the lock is owned by another thread. If not, try using CAS to acquire the lock. If the first two steps fail, the loop is repeated until the lock is acquired. At the end of the logic, lock=0 is set to release the lock. This method can greatly improve efficiency in concurrent scenarios with short conflict time.
CAS and spin locks are widely used in ConcurrentHashMap, and we’ll see them frequently in source code. This is also at the heart of ConcurrentHashMap’s design.
ConcurrentHashMap concurrency strategy overview
The concurrency strategy adopted by Hashtable and SynchronizeMap locks the entire array object, resulting in extremely poor performance. Prior to jdk1.7, ConcurrentHashMap used a lock fragmentation strategy to optimize performance, as shown below:
It’s like splitting the entire array into smaller arrays. Each operation only needs to lock the small array of operations. Different segments do not affect each other, which improves performance. After jdk1.8, the entire policy is refactored to lock nodes instead of segments:
The granularity of locks is further reduced and concurrency is more efficient. Jdk1.8 optimizes not only the refinement of lock granularity, but also the CAS+ Synchronize design. Then, we will explain the design ideas of ConcurrentHashMap in detail for common methods: add, delete, expand, initialize, etc.
Add data: putVal()
When ConcurrentHashMap adds data, the CAS+ Synchronize policy is adopted. First, it checks whether the node is null. If it is null, it tries to add the node using CAS. If the node fails to be added, a concurrency conflict occurs, and the node is locked and data is inserted. Performance can be significantly improved by eliminating locking in low concurrency scenarios. At the same time, the CAS is only tried once, and it does not waste CPU time by waiting for long threads.
The put method of ConcurrentHashMap flows as follows (but not as a whole) :
- First, it checks whether the array has been initialized. If not, it initializes the array first.
- If the node to be inserted is null, try using CAS to insert data.
- If the value is not null, the hash value of the node is -1. -1 indicates that the array is being expanded. The system helps expand the array first and then continues to insert data. (Assistance for expansion will be covered later)
- Finally, the lock is performed, and the data is inserted, and finally determine whether to return the old value; Instead of overwriting the old values, you need to update the number of nodes in the map, which is the addCount method in the diagram.
ConcurrentHashMap is based on HashMap. The inserted data, hash algorithm, and HashMap are all similar and will not be described here. After thinking clearly, let’s look at the source code analysis:
final V putVal(K key, V value, boolean onlyIfAbsent) {
// Null values or keys are not allowed to be inserted
// Allowing a null value causes the get method to return null in two ways:
// 1. Can't find the corresponding key2.
// The containsKey method is no longer reliable in a concurrent environment.
// You need to return null to indicate that the data could not be queried. Allowing a null key requires extra logic, takes up array space, and is not of much practical value.
// HashMap supports null keys and values, but ConcurrentHashMap does not support null keys for the above reasons.
if (key == null || value == null) throw new NullPointerException();
// High-low xor perturb hashcode, similar to HashMap
// But with a slight difference, as we'll see later, we can simply assume that the same is ok
int hash = spread(key.hashCode());
// bincount specifies the number of nodes in the linked list
int binCount = 0;
// Try multiple ways to loop through the process, there will be many such designs
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// If the array is empty, initialize it
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// Case 2: Target subscript object is null
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// Key: CAS is used for insertion
if (casTabAt(tab, i, null.new Node<K,V>(hash, key, value, null)))
break;
}
// Case 3: The array is expanding, help migrate data to the new array
// The new array will be added, and the next loop will insert the new array
// We are going to expand the capacity of the system
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// Case 4: Lock the node directly and insert data
// There is a lot of code below, but the logic is pretty much the same as inserting data into a HashMap
// Because it is already locked, there is no concurrency security design involved
else {
V oldVal = null;
// lock synchronously
synchronized (f) {
// Double check to see if the object you just obtained has changed
if (tabAt(tab, i) == f) {
// List processing
if (fh >= 0) {
binCount = 1;
// loop linked lists
for (Node<K,V> e = f;; ++binCount) {
K ek;
// If the same value is found, the old value is recorded
if(e.hash == hash && ((ek = e.key) == key || (ek ! =null && key.equals(ek)))) {
oldVal = e.val;
// Determine whether the value needs to be updated
if(! onlyIfAbsent) e.val = value;break;
}
Node<K,V> pred = e;
// If not found, insert at the end of the list
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break; }}}// Red-black tree processing
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) ! =null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update"); }}// Determine whether to convert to a red-black tree, and return the old value
if(binCount ! =0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if(oldVal ! =null)
return oldVal;
break; }}}// Total +1; This is a very hardcore design
// This is an important part of the ConcurrentHashMap design, which we will discuss later
addCount(1L, binCount);
return null;
}
// This method and HashMap
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
Copy the code
We notice that there are two key methods in the source: initialize initTable() of the array and change addCount of the total number of nodes in the map. How these two methods achieve thread-safety, we continue to analyze.
Initialize array: initTable()
The point of initialization is to ensure that multiple threads call the method concurrently, and that only one thread succeeds. ConcurrentHashMap uses CAS+ spin to solve the concurrency problem. The overall process is shown as follows:
- First, the array is checked to see if it is null. If not, another thread has finished initialization, and the array is returned directly.
- The second step is to determine whether initialization is taking place. If so, the current thread spins to wait for CPU execution time
- If the array is null and no other thread is initializing, an attempt is made to acquire the spinlock. If the array succeeds, the initialization is performed. If the array fails, a concurrent conflict occurs and the loop continues.
Instead of locking directly, ConcurrentHashMap uses CAS+ spin locking to improve performance. A spin lock ensures that only one thread can actually initialize an array without the expensive cost of synchronize. Before looking at the source code analysis, let’s take a look at one of the key variables in ConcurrentHashMap: sizeCtl.
SizeCtl defaults to 0. In normal cases, it represents the threshold of ConcurrentHashMap and is a positive number. When the array is being expanded, its value is -1, indicating that the array is being initialized. Other threads only need to determine sizeCtl==-1 to know that the array is being initialized. However, when ConcurrentHashMap is being expanded, sizeCtl is a negative number indicating how many threads are currently assisting in the expansion, which we will discuss next when we talk about expansion. InitTable ();
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// The loop is initialized by spinning instead of locking
// First check whether the array is null or of length 0
// There is no initialization in the constructor, mainly because of lazy loading
while ((tab = table) == null || tab.length == 0) {
// sizeCtl is a key variable;
// The default value is 0. -1 indicates initialization, <-1 indicates how many threads are helping expansion, and >0 indicates the threshold
if ((sc = sizeCtl) < 0)
Thread.yield(); // Allocate CPU execution time
// Set sc to -1 through the CAS to obtain the optional lock
// Other threads cannot enter the initialization and wait for the option
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// Double check for null
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0)? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n]; table = tab = nt;// Set sc as the threshold. N >>>2 indicates 1/4 x N, which is equivalent to 0.75N
sc = n - (n >>> 2); }}finally {
// assign sc to sizeCtl
sizeCtl = sc;
}
break; }}// Finally return the TAB array
return tab;
}
Copy the code
Let’s continue to look at how the addCount() method implements concurrency security.
Change the total number of nodes: addCount()
The goal of the addCount method is simply to add +1 to the total number of ConcurrentHashMap nodes, which is the question I posed at the beginning of this article. The authors of ConcurrentHashMap have designed a very rigorous architecture to ensure concurrency security and high performance.
ConcurrentHashMap is not a single size variable, it splits size as shown below:
The size of ConcurrentHashMap is equal to size1, size2… Combined. What’s the advantage of this split? The advantage is that each thread can modify the corresponding variable individually. The diagram below:
Two threads can increment at the same time without any performance cost, isn’t it a magical idea? When you want to get the total number of nodes, you just add them all up. In ConcurrentHashMap each size is wrapped in a CounterCell object. The CounterCell class is very simple:
static final class CounterCell {
volatile long value;
CounterCell(longx) { value = x; }}Copy the code
The value is simply decorated with the volatile keyword. Don’t know the volatile keyword? You can refer to this article are understand | the volatile keyword in Java, in simple terms is to ensure that the current thread to modify the value of the other threads can know at once. ConcurrentHashMap uses an array to store Countercells as follows:
How does each thread assign its own CounterCell? ConcurrentHashMap uses an idea similar to HashMap to obtain a random number of threads, and then modulo this random number to obtain the corresponding CounterCell. After obtaining the corresponding CounterCell, the current thread will try to modify it using CAS. If the modification fails, it will obtain the random number of the thread again and try another CounterCell again until the modification succeeds.
The above is the core idea of the addCount method, but the source code design will be a bit more complex, must also consider the initialization of the CounterCell array, the creation of the CounterCell object, the expansion of the CounterCell array. ConcurrentHashMap also retains a Basecount. Each thread attempts to modify the basecount using CAS first. If the modification fails, the basecount is delivered to the counterCell array. The overall process is as follows:
- The current thread will use CAS to modify basecount. If the basecount fails to change, the thread will enter array to allocate CounterCell to change.
- Check whether the CounterCell array is empty,
- If the CounterCell array is empty, the array is initialized
- If the CounterCell array is not empty, use thread random numbers to find subscripts
- If the subscript counterCell has not been initialized, create a counterCell first, which I have not marked in the figure. Once you have created CounterCell, you also need to consider whether you need to expand the array
- If the counterCell object is not null, use CAS to attempt to modify it. If the counterCell object fails, try again
- If neither of the above conditions is satisfied, then go back and try CAS again to modify basecount
It may seem complicated, but the rest of the steps are detailed, as long as you stick to the core concept of the size variable split into multiple countercells. As you can see, the idea does not mention Synchronize at all. The authors of ConcurrentHashMap use CAS+ spin locks to replace Synchronize, which provides a significant performance boost in high concurrency situations. After thinking clearly, we look at the source code is also a little easier. Let’s read the fucking code:
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// If the array is not empty or the array is empty and directly updating basecount fails
if((as = counterCells) ! =null| |! U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
// Indicates that no competition has occurred
boolean uncontended = true;
// The fullAddCount method is entered in the following cases:
// 1. The array is null and failed to modify basecount directly
// 2. The array subscript after the hash is null
The CAS fails to modify the CounterCell object
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null| |! (uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {// This method guarantees completion of the update, key method!!
fullAddCount(x, uncontended);
return;
}
// If the length is <=1, you don't need to expand it.
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
// Expand the capacity}}Copy the code
If an attempt to modify basecount directly fails, the fullAddCount method is entered:
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
// Force initialization of a thread random number if the current thread random number is 0
// This random number works just like hashCode, but it doesn't need to be looked up
// Each of the following loops retrieves a random number so that threads don't get stuck in the same place
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
h = ThreadLocalRandom.getProbe();
// wasUncontended indicates no competition
// If the value is false, the CAS failed to modify the CounterCell and the thread random number needs to be obtained again
wasUncontended = true;
}
If it is true, expansion is required
boolean collide = false;
// There are three big cases:
If the array is not null, the CAS fails to update the CounterCell or the countCell object is null
// 2. If the array is null, CAS failed to update the baseCount and needs to initialize the array
// 3. The baseCount lock cannot be obtained in step 2
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
// First case: the array is not null
if((as = counterCells) ! =null && (n = as.length) > 0) {
// The subscript CounterCell is null
if ((a = as[(n - 1) & h]) == null) {
// Determine whether the current lock is occupied
// cellsBusy is a spin lock, 0 indicates that it is not occupied
if (cellsBusy == 0) {
// Create a CounterCell object
CounterCell r = new CounterCell(x);
// Try to acquire the lock to add a new CounterCell object
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0.1)) {
boolean created = false;
try {
CounterCell[] rs; int m, j;
// recheck whether the value is null once
if((rs = counterCells) ! =null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
// created=true Indicates a successful creation
created = true; }}finally {
/ / releases the lock
cellsBusy = 0;
}
// Create success +1 success, return directly
if (created)
break;
// Another thread has inserted data
// Continue the loop and start again
continue; }}// You want to create an object, but the lock is occupied
collide = false;
}
// CAS failed to change the CounterCell directly
else if(! wasUncontended)// CAS already known to fail
wasUncontended = true; // Continue after rehash
// Try CAS to CounterCell
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
// If the capacity has been expanded or the number of cores has reached the maximum number allowed by the VM, the collision is considered non-collision
// Because it can't be expanded anymore
// So the theoretical maximum number of concurrent threads is NCPU
else if(counterCells ! = as || n >= NCPU) collide =false; // At max size or stale
// If the values above are false, a conflict occurs and capacity needs to be expanded
else if(! collide) collide =true;
// Get the spin lock and expand the capacity
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0.1)) {
try {
if (counterCells == as) {// Expand table unless stale
// Make the array twice as large
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; }}finally {
/ / releases the lock
cellsBusy = 0;
}
collide = false;
// Continue the loop
continue;
}
// This step rehashes to find the next CounterCell object
// Each step above fails to get a new random number here
h = ThreadLocalRandom.advanceProbe(h);
}
// If the array is null, try to acquire the lock to initialize the array
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0.1)) {
boolean init = false;
try {
// recheck checks whether the array is null
if (counterCells == as) {
// Initialize the array
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true; }}finally {
/ / releases the lock
cellsBusy = 0;
}
// If the initialization is complete and the loop is broken,
// The initialization process also includes creating a CounterCell object
if (init)
break;
}
// If the array is null and the lock is not available, another thread is creating an array and trying to update baseCount directly
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
// Update success returns directly
break; }}Copy the code
The overall idea of the source code is similar to what we said before, with a lot of CAS+ spin locks used in detail to ensure thread safety. The above comments are very detailed and will not be repeated here. ConcurrentHashMap, whose solution might seem to be a CAS+ Synchronize solution, has developed a multi-threaded update solution that combines CAS and spin locks to improve performance in high-concurrency environments.
If it is a magical idea to divide a variable into multiple sub-variables and use multi-threaded cooperation, would it be even more magical to have multiple threads working together to complete the expansion operation at the same time? ConcurrentHashMap not only avoids the performance cost of concurrency, but even takes advantage of it, with multiple threads working together to do one thing. Let’s look at ConcurrentHashMap’s expansion plan.
Expansion solution: Transfer ()
Before expanding, there are two things to add: siezeCtl and ForwardingNode.
SizeCtl, as mentioned earlier, defaults to 0 and generally represents the threshold of ConcurrentHashMap. When the array is initialized, the value is -1. When the array is expanded, it represents the number of threads participating in the expansion. ConcurrentHashMap sets sizeCtl to a small negative number during expansion and remembers this negative number. The thread participates in the expansion, the negative number +1, the thread exits the negative number -1, so that it can remember the number of threads. One variable maintains four states. Again, kudos to the authors of ConcurrentHashMap.
So what is this negative number? There’s an algorithm. SizeCtl initialization code:
int rs = resizeStamp(n);// Where n represents the length of the array
sizeCtl = rs << RESIZE_STAMP_SHIFT +2 ; // RESIZE_STAMP_SHIFT is a constant with a value of 16
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
Copy the code
Let’s look at this algorithm bit by bit.
Integer.numberOfLeadingZeros(n)
This method represents the number of zeros before the highest bit of n, such as the 32-bit binary of 800000000 0000000 00000000 00001000
. So the return is 28, with 28 zeros in front of it.RESIZE_STAMP_BITS-1
Value is 15,1<<RESIZE_STAMP_BITS-1
And the result is that00000000 00000000 10000000 00000000
.- If n is equal to 8, then
Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1))
And the result is that0000000 0000000 10000000 00011100
, this number is called the check code and is denoted rs. - The last execution
rs << RESIZE_STAMP_SHIFT +2
SizeCtl gets the final value:10000000 000111000 000000000 00000010
We will find that during expansion, the high 16 bits are the check code, and the low 16 bits are the number of threads, which will be +2 during initialization and +1 when new threads are added. What’s the point of the check code? SizeCtl >>>RESIZE_STAMP_BITS == RS to determine whether the current array is expanding.
Then take a look at ForwardingNode. It is a node class whose function is to indicate that the current node has been migrated. The diagram below:
ConcurrentHashMap traverses and migrates from the back to the front. A node that has been migrated is assigned the value of ForwardingNode, indicating that all data under the node has been migrated. A ForwardingNode is similar to a regular node, but its hash value is MOVED, which is -1. Remember putVal? During insertion, it will determine whether the current node is a ForwardingNode. If so, it will help migrate first. Otherwise, if the capacity is being expanded, it indicates that the capacity expansion has not reached the current subscript, you can directly insert it.
With sizeCtl and ForwardingNode behind you, take a look at the expansion solution for ConcurrentHashMap. The expansion of ConcurrentHashMap is coordinated by multiple threads, which improves efficiency, as shown in the following figure:
ConcurrentHashMap segments the entire array, one for each thread. Bound represents the lower limit of the thread range, and I represents the subscript that is currently being migrated. Each migrated node is assigned the value ForwardingNode to indicate that the migration is complete. Stride represents the “stride” of thread migration. When the thread completes the task within the range, it will continue to see if there is any more to be migrated. TransferIndex is to record the subscript to be migrated. When transferIndex==0, no help is needed. This is the core idea of the ConcurrentHashMap expansion solution. The idea for thread safety is similar to the previous approach, which uses CAS+ spin lock + Synchronize.
In addition, ConcurrentHashMap migrates linked lists and binary trees in a slightly different way than HashMap, so I will not expand it here. Expansion plan does not plan to draw the overall flow chart, as long as you understand the core ideas, other details are logical control. Let’s go straight to source code analysis.
First, we need to look at the addCount method, which we have introduced the increment logic, but we did not introduce the second part of the increment logic, now let’s look at:
private final void addCount(long x, int check) {...// Total +1 logic
// This part of the logic is mainly to determine whether to expand the capacity
// Make sure that only one thread can create new arrays
// Other threads can only assist in migrating data
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// If the length reaches the threshold and does not reach the maximum value, expand the capacity
while (s >= (long)(sc = sizeCtl) && (tab = table) ! =null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// This number fits with subsequent sizeCtr calculations
// The 16th digit must be 1, and the lower 15 digits represent consecutive zeros before n, as we described earlier
int rs = resizeStamp(n);
// If the value is smaller than 0, the system is expanding or initializing. Otherwise, the system goes to the next step to create a new array
if (sc < 0) {
// If you are migrating 16 bits to the right, it must equal rs
/ / (sc = = rs + 1 | | sc = = rs + MAX_RESIZERS) I think that would not make a true these two conditions
// If you are interested, you can click on the website below
// https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427
// nextTable==null indicates that the next array has not been created
// transferIndex<=0 indicates that the migration is complete
// Recirculate the spin according to the above condition
if((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs +1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// Help migrate, SC +1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// Preempt locks for capacity expansion
// Shift the rs check code 16 bits to the left and then +2, as we described above
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// Succeeded in preempting the spin lock. Capacity expansion is performed
transfer(tab, null);
// Update the total number of nodes and continue the loops = sumCount(); }}}Copy the code
The above approach focuses on using sizeCtl as a spin lock, ensuring that only one live thread can create a new array and the other threads can only help migrate the array. Then the following method is the key method of expansion plan:
// Two arguments: TAB represents the old array and nextTab represents the new array
NextTab ==null; other threads nextTab== the array created by the first thread
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// Stride indicates the length of each stride, with a minimum of 16
if ((stride = (NCPU > 1)? (n >>>3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// Create a new array if it has not already been created
// Only one thread can create arrays
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
// Double the size of the array
Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n <<1];
nextTab = nt;
} catch (Throwable ex) {
If OOM is displayed, change the threshold to the maximum value
sizeCtl = Integer.MAX_VALUE;
return;
}
// Change the internal variable nextTable of concurrentHashMap
nextTable = nextTab;
// The migration starts with the array length
transferIndex = n;
}
int nextn = nextTab.length;
// marks the node to which the subscript is set for each migrated array
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// advance indicates whether the current thread should advance
// Finish Indicates whether the migration is complete
// The official comment indicates that before assigning the value to true, one more scan must be done to ensure that the migration is complete, as discussed later
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
// I indicates the subscript of the data being migrated by the current thread, and bound indicates the lower limit of the data being migrated from back to front
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// This loop determines whether we need to move forward, and if so CAS changes the next bound and I
while (advance) {
int nextIndex, nextBound;
// Advance =false if the lower limit has not been reached or the end has been reached
if (--i >= bound || finishing)
advance = false;
// Updates the subscript of the transferIndex with each round of the loop
// If the next subscript is 0, there is no need to proceed
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// Use CAS to change bound and I to move data forward
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false; }}// I has reached the boundary, indicating that the task of the current thread is completed and no further progress is required
// If the first thread needs to update the table reference
// Assisted threads need to reduce sizeCtl to exit again and again
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// Update the table reference if it has been updated
if (finishing) {
nextTable = null;
table = nextTab;
// Update sizeCtl as the threshold
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// The thread that completes its migration task will sizeCtl minus one
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// Sc-2 does not equal the parity code, indicating that this thread is not the last thread and another thread is expanding
// Then go straight back, he has finished his task
// The last thread needs to scan the array again to see if there are any leftovers
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// Finish Is set to true to indicate completion
// Set I to n and scan the array again
finishing = advance = true;
i = n; // recheck before commit}}// If the current node is null, the migration is complete
else if ((f = tabAt(tab, i)) == null)
// Advance may fail, so you can't set advance to true directly
advance = casTabAt(tab, i, null, fwd);
// The current node is ForwardingNode, which indicates that the migration is complete and continue
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// Lock the head node to migrate it
// The following content does not involve concurrency control details, just pure data migration
// The idea is similar to that of HashMap, but there are some differences, too
// You can read the following source code, this part is easier to understand
synchronized (f) {
// Lock the node again to see if it is the same node
// If not, loop again
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// If the hash value is greater than or equal to 0, the node is an ordinary linked list node
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
// ConcurrentHashMap does not directly divide the list into two
// The end of the segment is moved to the same position
// For example, the node may be 1 or 5 after migration, and the linked list case is:
// 1 -> 5 -> 1 -> 5 -> 5 -> 5
// concurrentHashMap will extract the last three 5's first, and lastRun will point to the third from last
for(Node<K,V> p = f.next; p ! =null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// Determine where the tail is moving to
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for(Node<K,V> p = f; p ! = lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;
// This node is modified
// Insert into the linked list using the header method
// There is no need to worry about the linked list loop because it is already locked
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// The list is constructed and assigned to the array
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// Set the flag object to indicate that the migration is complete
setTabAt(tab, i, fwd);
advance = true;
}
// Tree node processing, and linked list of the same idea, but he did not lastRun, directly divided into two linked lists, using the tail interpolation method
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for(Node<K,V> e = t.first; e ! =null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null.null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
elsehiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc ! =0)?newTreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc ! =0)?new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
Copy the code
Expansion is a relatively heavy operation, it needs to create a new array and move the original nodes one by one, in a high concurrency environment, if the entire table is directly locked, many threads will be blocked. The design of ConcurrentHashMap enables multiple threads to cooperate in capacity expansion, and data can be read and inserted during capacity expansion, greatly improving efficiency. Use multiple threads to work together to improve efficiency.
There is another way to expand capacity: helpTransfer. The putVal method is called when a ForwardingNode object is encountered. Look at the previous source code, this part of the source code is much simpler, no bb, show the code:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// Check that the current node is ForwardingNode and a new array has been created
if(tab ! =null && (f instanceofForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) ! =null) {
int rs = resizeStamp(tab.length);
// sizeCtl<0 indicates expansion
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// Check whether expansion has been completed or has been advanced to 0
if((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs +1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// Try to get SC +1 and help with capacity expansion
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break; }}// Returns the expanded array
return nextTab;
}
// If the array is not initialized or the node is not ForwardingNode, return the original array
return table;
}
Copy the code
To this expansion program on the end of the source analysis. Although the idea of expansion is simple, it requires a lot of logical control to ensure thread safety, so the amount of source code is also very much. The core method of ConcurrentHashMap has been analyzed, and other ideas such as remove and replace are similar to those described above. Readers can read the source code by themselves.
The last
That’s it for concurrentHashMap. In the future, it’s not just a ConcurrentHashMap that’s safe. ConcurrentHashMap’s excellent CAS+ spin – lock + Synchronize concurrency design is the focus of the framework.
What is the use of reading the source code for ConcurrentHashMap? For an interview! If you’re not an expert on concurrency, lock it up and don’t bother with all the bells and bolts, says Java Programming Minds. The ConcurrentHashMap source code tells us that the concurrency problem is far more complicated than we think. Learning ConcurrentHashMap is not about learning how to write the same code. In addition to interviewing, I think it is more important to feel the wisdom of programming. ConcurrentHashMap author’s magic design, rigorous code, we have a safe and high performance ConcurrentHashMap can be used in concurrent environment. His idea is that if you can apply a little bit in practice, it is a great gain.
Now, are there any answers to the first two questions?
Hope this article is helpful to you
Full text here, the original is not easy, feel help can like collection comments forward. I have no talent, any ideas welcome to comment area exchange correction. If need to reprint please comment section or private communication.
And welcome to my blog: Portal