Historical articles recommended:
- ConcurrentHashMap has 10 details to improve performance. Do you know them all?
- HashMap interviews, read this article
- There are seven ways to do something about SpringBoot initialization
- Be careful with these three holes in Java serialization
- Seven potential memory leak risks in Java. How many do you know?
- A look at the new features in JDK 16
- What? It’s slower with parallel streams
The ConcurrenHashMap source code for Java 7 is a textbook for multithreaded programming. In the Java 7 source code, the authors are very cautious about the use of pessimistic locks, most of which are switched to spunlocks and volatile to obtain the same semantics. Even if they have to be used in the end, the authors use various techniques to reduce the critical area of locks. We have talked about in the last article, spin locks in the critical region is smaller when is a better choice because it avoids the thread context switching due to obstruction, but essentially it is a lock, and during the spin wait only one thread can enter the critical section, other threads will only use the spin CPU time slice. The implementation of ConcurrentHashMap in Java 8 uses some clever design and tricks to circumvent the limitations of spin locking and provide higher concurrency performance. If the Java 7 version of the source code teaches us how to turn a pessimistic lock into a spinlock, in Java 8 we can even see how to turn a spinlock into a lock-free one.
Put the book read thin
Photo: www.zhenchao.org/2019/01/31/…
Before starting this article, you should first have such a map in mind. If you are familiar with HashMap, you should also have this map in mind. In fact, Java 8 ConcurrentHashMap and HashMap are basically the same in terms of overall data structure design.
Java 7 ConcurrentHashMap uses a lot of programming tricks to improve performance, but there is still a lot of room for improvement in the design of the Segment.
Segment
During capacity expansion, the non-capacity expansion thread is to this nodeSegment
All write operations are pending- right
ConcurrentHashMap
The read operation requires two hashes, and there is an additional performance penalty for reading too much and writing too little - although
size()
The lock – free read () method is attempted first, but is called if another thread is writing during the processsize()
This thread will give the wholeConcurrentHashMap
Lock it. This is the wholeConcurrrentHashMap
A unique global lock, which has performance implications for the underlying components - Extreme cases (such as a client implementing a poorly performing hash function)
get()
The method’s complexity will degrade toO(n)
.
For 1 and 2, the design in Java 8 deprecates the use of Segment, reducing the granularity of pessimistic locks to the bucket dimension so that get calls don’t have to be hashed twice. The design of size() is one of the biggest highlights of the Java 8 release, which we’ll explain in more detail in a later article. As for red-black trees, this article still doesn’t say much about them. In the following sections, I will dig into details to make the book more extensive. Modules involved include initialization, PUT method, expansion method transfer and size() method, while other modules, such as hash function, have little change, so I will not go into details.
Prepare knowledge
ForwardingNode
static final class ForwardingNode<K.V> extends Node<K.V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
// MOVED = -1, the ForwardingNode hash is -1
super(MOVED, null.null.null);
this.nextTable = tab; }}Copy the code
In addition to the normal Node and TreeNode, ConcurrentHashMap also introduces a new data type, ForwardingNode. We will only show its constructor here. ForwardingNode has two functions:
- Indicates that a bucket has been copied to a new bucket array during dynamic capacity expansion
- If there is one during dynamic expansion
get
Method, thenForwardingNode
The request will be forwarded to a new bucket array to avoid blockingget
Method call,ForwardingNode
The expanded bucket array is added during constructionnextTable
Save it.
UNSAFE.compareAndSwap***
ConcurrentHashMap implements CAS in Java version 8. Using int as an example, its methods are defined as follows:
/**
* Atomically update Java variable to <tt>x</tt> if it is currently
* holding <tt>expected</tt>.
* @return <tt>true</tt> if successful
*/
public final native boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);
Copy the code
The corresponding semantics are:
If object O’s starting address offset is equal to expected, set the value to x and return true to indicate that the update succeeded; otherwise, return false to indicate that the CAS failed
Initialize the
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if(! (loadFactor >0.0 f) || initialCapacity < 0 || concurrencyLevel <= 0) // Check the parameters
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel)
initialCapacity = concurrencyLevel;
long size = (long) (1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size); // tableSizeFor; tableSizeFor; // tableSizeFor
this.sizeCtl = cap;
}
Copy the code
Even the most complex initialization method code is relatively simple, and there are only two points to note here:
concurrencyLevel
inJava 7
Is in theSegment
The length of the array due to theJava 8
Chinese has been abandonedSegment
, soconcurrencyLevel
It is a reserved field and has no practical meaningsizeCtl
If this value is -1, it indicates that the system is being initialized. If it is negative, it indicates that the system is being expandedsizeCtl
The lower 16 bits of binary are equal to the number of expanded threads plus one, and the higher 16 bits (excluding the sign bits) contain the size of the bucket array
put
methods
public V put(K key, V value) {
return putVal(key, value, false);
}
Copy the code
The put method forwards the call to the putVal method:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// [A] delay initialization
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// [B] the current bucket is empty
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null.new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// [C] if the first element of the current bucket is a ForwardingNode, the thread will try to join the expansion
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 【D】 otherwise, go through the list or tree in the bucket and insert
else {
// Fold it up for now}}// [F] If the process reaches this point, it indicates that the process has been put successfully, and the total number of map records is increased by one
addCount(1L, binCount);
return null;
}
Copy the code
From the whole code structure, the process is relatively clear. I marked several very important steps with parentheses and letters. The PUT method still involves a lot of knowledge points
Initialization of the bucket array
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// Indicates that a thread is initializing, and the thread is starting to spin
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// CAS guarantees that only one thread can reach the branch
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0)? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n]; table = tab = nt;// sc = n-n /4 = 0.75n
sc = n - (n >>> 2); }}finally {
// restore sizeCtl > 0
sizeCtl = sc;
}
break; }}return tab;
}
Copy the code
If multiple threads execute initTable, CAS ensures that only one thread can enter the actual initialization branch, while the rest of the threads spin wait. Let’s focus on three things in this code:
- As mentioned earlier, when a thread starts to initialize the bucket array, the
CAS
willsizeCtl
When set to -1, other threads use this as a signal to start spin waiting - When the bucket array is initialized
sizeCtl
Returns to a positive value equal to 0.75 times the length of the bucket array, meaning the same as beforeHashMap
In theTHRESHOLD
Consistency is the critical point for the system to trigger expansion - in
finally
Statement ofsizeCtl
The operation is not usedCAS
becauseCAS
Ensure that only one thread can execute to this point
Add the first element of the bucket array
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
Copy the code
The second branch of the PUT method uses tabAt to determine whether the bucket is empty and, if so, writes to it via CAS. TabAt uses the UNSAFE interface to retrieve the latest element in the bucket. CasTabAt uses CAS to ensure that there are no concurrency issues
Determine whether to add threads for capacity expansion
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if(tab ! =null && (f instanceofForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) ! =null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// RESIZE_STAMP_SHIFT = 16
if((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs +1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// The sizeCtl value is increased by 1, indicating that the number of threads participating in the expansion is +1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break; }}return nextTab;
}
return table;
}
Copy the code
The sizeCtl flag bit is discussed in detail here. The temporary variable RS is returned by the resizeStamp method
static final int resizeStamp(int n) {
// RESIZE_STAMP_BITS = 16
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
Copy the code
Because the parameter n is a value of type int, all Integer numberOfLeadingZeros (n) returns a value between 0 to 32, if converted to binary
Integer.numberOfLeadingZeros(n)
The maximum value of is: 00000000 00000000 00000000 00100000Integer.numberOfLeadingZeros(n)
The minimum value of is: 00000000 00000000 00000000 00000000
Therefore, the return value of resizeStampd is between 00000000 00000000 10000000 00000000 and 00000000 00000000 10000000 00100000, It can be seen from the range of return value that the 16 bits of the return value of resizeStamp are all 0, which does not contain any information. Therefore, in ConcurrrentHashMap, the return value of resizeStamp is moved 16 bits left to fit into sizeCtl, which is why the top 16 bits of sizeCtl contain the entire Map size. With this analysis, the longer if judgments in this code can be made sense
if((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs +1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
Copy the code
(sc >>> RESIZE_STAMP_SHIFT) ! = rs
Ensure that all threads are expanded based on the same old bucket arraytransferIndex <= 0
A thread has completed the capacity expansion task
As for sc = = rs + 1 | | sc = = rs + MAX_RESIZERS if the two judgment conditions are carefully classmates will feel difficult to understand, this place is, indeed, the JDK a BUG, the BUG in the JDK 12 have repair, The details you can refer to the Oracle’s website: bugs.java.com/bugdatabase… = = (rs + 1 < < RESIZE_STAMP_SHIFT) | | sc = = (rs < < RESIZE_STAMP_SHIFT) + MAX_RESIZERS, because to directly compare the rs and sc ` is meaningless, there must be shift operation. What it means is
sc == (rs << RESIZE_STAMP_SHIFT) + 1
If the number of threads for capacity expansion is 0, capacity expansion has been completed, and there is no need to add new threads for capacity expansionsc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
If the number of threads participating in capacity expansion reaches the maximum, no more threads need to be added for capacity expansion
The actual logic for scaling is in transfer, which we will look at later, but there is a small detail to note in advance. If nextTable is already initialized, Transfer returns a reference to nextTable, and you can then directly manipulate the new bucket array.
Insert the new value
If the bucket array has been initialized, the expanded bucket has been expanded, and there are already elements in the hashed bucket, the process is just like a normal HashMap, except that the current bucket is locked.
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)/ / folding
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {/ / folding}
else if ((fh = f.hash) == MOVED)/ / folding
else {
V oldVal = null;
synchronized (f) {
// Pay attention to this small judgment condition
if (tabAt(tab, i) == f) {
if (fh >= 0) { // fh>=0 is a linked list, otherwise it is a tree or ForwardingNode
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if(e.hash == hash && ((ek = e.key) == key || (ek ! =null && key.equals(ek)))) {
oldVal = e.val; // If there is a value in the list, update it directly
if(! onlyIfAbsent) e.val = value;break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
// If there is no value in the list, it is directly connected to the end of the list
pred.next = new Node<K,V>(hash, key, value, null);
break; }}}// The red-black tree operation will be skipped}}}}// If put succeeds, the number of map elements +1
addCount(1L, binCount);
return null;
}
Copy the code
Note a trivial judgment condition in this code (the context is noted in the source code) : tabAt(TAB, I) == f. The purpose of this judgment is to handle the contention between the thread calling the PUT method and the expansion thread. Synchronized is a blocking lock. If the thread that calls put happens to operate the same bucket at the same time as the expansion thread, and the thread that calls PUT fails to compete for the lock, the element in the bucket will become a ForwardingNode when the thread regains the lock. TabAt (TAB, I)! Is equal to f.
Multithreaded dynamic capacity expansion
I apologize to the readers who are reading this article on their mobile phones. Although I tried to unpack the source code, I had to show such a long section of source code for coherence.
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1)? (n >>>3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // Initializes a new bucket array
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n <<1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false; }}if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// Check whether it is the last thread to expand
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit}}else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED) // Only the last expansion thread has a chance to execute the branch
advance = true; // already processed
else { // The replication process is similar to HashMap and will not be described here
synchronized (f) {
/ / folding}}}}Copy the code
Before diving into the details of the source code, let’s look at some of the features of the ConcurrentHashMap expansion in Java 8:
-
The new bucket array, nextTable, is twice as long as the previous HashMap
-
The threads participating in the expansion also fragment the elements of the table into a new bucket array, nextTable
-
Buckets The elements of a bucket array are evenly distributed between two buckets in the new bucket array, and the subscripts of buckets differ by n(the length of the old bucket array), which is still consistent with the HashMap
How do threads work together
Let’s start with the key variable transferIndex, which is volatile modified to ensure that all threads read the latest value.
private transient volatile int transferIndex;
Copy the code
This value is initialized by the first thread to participate in the expansion, because only the first thread to participate in the expansion satisfies the condition nextTab == NULL
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n <<1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
Copy the code
This loop is easy to understand given the transferIndex property
while (advance) {
int nextIndex, nextBound;
// When bound <= I <= transferIndex I subtracts out of the loop and continues working
if (--i >= bound || finishing)
advance = false;
// All tasks for capacity expansion have been claimed
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// Otherwise, claim a new replication task and update the transferIndex value through 'CAS'
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false; }}Copy the code
TransferIndex is like a cursor. When each thread claims a replication task, it will update it to transferIndex-Stride through CAS. CAS can ensure that the transferIndex can be reduced to 0 according to the stride.
Does the last expansion thread need a second confirmation?
For each expansion thread, variable I of the for loop represents the subscript in the bucket array of the bucket to be copied. The upper and lower limits of this value are calculated by the cursor transferIndex and the stride. When I is reduced to a negative value, it means that the current expansion thread has completed the expansion task, and the process will go to this branch:
/ / I > = n | | I + n > = nextn take less than now
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { // 【A】 Complete the expansion process
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 【B】 Check whether it is the last thread of expansion. If it is, rescan the bucket array for a second check
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// (SC-2) == resizeStamp(n) << RESIZE_STAMP_SHIFT indicates the last expansion thread
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// Re-scan the bucket array for a second check
finishing = advance = true;
i = n; // recheck before commit}}Copy the code
Since the variable finishing is initialized to false, the if branch with comment [B] will be executed when the thread enters the if branch for the first time, and the lower 16 bits of sizeCtl will be initialized to add one to the number of threads participating in the expansion. Therefore, if the condition (SC-2! = resizeStamp(n) << RESIZE_STAMP_SHIFT, it can be proved that the current thread is the last thread to expand, at this time, set I to N to rescan the bucket array, And finishing is set to true to ensure that after the bucket array is scanned, it can enter the branch with comment [A] to end expansion.
There is a problem here. According to our previous analysis, the expansion threads can work together to ensure that the segments of the bucket array they are responsible for are not heavy or leaky. Why do we need to do a second confirmation here? A developer in concurrency – interest this mailing list also consulted about this Doug Lea (address: cs.oswego.edu/pipermail/c…
Yes, this is a valid point; thanks. The post-scan was needed in a previous version, and could be removed. It does not trigger often enough to matter though, so is for now another minor tweak that might be included next time CHM is updated.
Although Doug used phrases such as could be, not often enough in his email, he also confirmed that the second check of the last expansion thread was not necessary. The copying process is similar to a HashMap, and interested readers can scroll through previous articles.
size()
methods
AddCount () method
// Record a member variable for the total number of map elements
private transient volatile long baseCount;
Copy the code
At the end of the put method, there is an addCount method, which maintains the total number of elements in the current ConcurrentHashMap because putVal has successfully added an element. In ConcurrentHashMap, there is a variable baseCount used to record the number of elements in the map. As shown in the figure below, if n threads operate the baseCount variable through CAS at the same time, only one of them will succeed, and all the other threads will be in endless spin. That’s bound to create performance bottlenecks.
To avoid a large number of threads spinning around waiting to write to baseCount, ConcurrentHashMap introduces a secondary queue, as shown below. Now the threads operating on baseCount can be split into this secondary queue. To call size(), you simply add the values in the baseCount and the secondary queue, thus making the size() call unlocked.
The secondary queue is an array of type CounterCell:
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(longx) { value = x; }}Copy the code
It simply wraps a value of type long, and the question is, how does it know which value is in the secondary queue for a particular thread? The answer is the following method:
static final int getProbe(a) {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
Copy the code
The getProbe method returns a unique id for the current thread. This value does not change. Therefore, the return value of getProbe can be subscript by complicating the length of the secondary queue. If it returns 0. You need to call ThreadLocalRandom localInit initialization (). There are two details to note in the addCount method
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// Note that the judgment condition here is tricky
if((as = counterCells) ! =null| |! U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
// The variable uncontended records whether the CAS operation succeeded! (uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended);return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
// Check whether the capacity needs to be expanded}}Copy the code
Details:
First, notice the new if judgment condition in the method:
if((as = counterCells) ! =null| |! U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
}
Copy the code
If (as = counterCells)! If = null, CAS will not be executed. The author has two considerations:
- The reason is that if
(as = counterCells) ! = null
, indicating that the secondary queue has been initialized, as opposed to all threads spinning waitbaseCount
This one variable, let the thread passCAS
Demanipulating the value in the queue has a greater likelihood of success because the maximum length of the secondary queue is a positive integer power of 2 greater than the current number of processors, allowing for greater concurrency - If the secondary queue has not been initialized until it is necessary to create a queue, how can we determine “necessity”? Depending on the
baseCount
theCAS
If the operation fails, it indicates that the concurrency in the current system is high and the queue is required. Otherwise, the operation is performed directlybaseCount
Details of the two:
Only when the auxiliary queue has been in existence, and by ThreadLocalRandom. GetProbe () in the auxiliary queue to determine the location of the not null, only the CAS is done in the operation, it is a normal defensive judgment, but uncontended recorded the CAS success, failure, Will call in fullAddCount ThreadLocalRandom. AdvanceProbe in a status code adjustment under the current thread in the position of the auxiliary queue, avoid all threads in the same pit of auxiliary queue spin wait.
fullAddCount()
methods
// See LongAdder version for explanation
// wasUncontended records whether the CAS is successful for the caller. If the CAS fails, change the element of the secondary queue to continue the CAS
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
// 【A】 If the secondary queue has been created, the secondary queue can be operated directly
if((as = counterCells) ! =null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0.1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if((rs = counterCells) ! =null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true; }}finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if(! wasUncontended)// If the caller CAS fails, this cycle runs empty, and the next cycle continues by changing the subscript
wasUncontended = true; // Continue after rehash
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if(counterCells ! = as || n >= NCPU)// If the length of the secondary queue exceeds the number of cpus, this cycle runs empty, and the next cycle continues with subscripts
collide = false; // At max size or stale
else if(! collide)// If the last operation failed (CAS failed or creating a CounterCell failed), this cycle runs empty and the next cycle changes the subscript to continue
collide = true;
else if (cellsBusy == 0 && // If two consecutive operations on the secondary queue fail, expand the secondary queue
U.compareAndSwapInt(this, CELLSBUSY, 0.1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; }}finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// If the last operation failed or the caller CAS failed, it will go here and transform the secondary queue subscript to operate on
h = ThreadLocalRandom.advanceProbe(h);
}
// [B] if the secondary queue is not already created, create it with a lock
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0.1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true; }}finally {
cellsBusy = 0;
}
if (init)
break;
}
// 【C】 If the secondary queue fails to be created, try 'baseCount'
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base}}Copy the code
Since counterCells is a normal array, writes to it, including initialization, expansion, and writes to elements, need to be locked by spin-locking the global variable cellsBusy. Let’s start with the three outermost branches:
- [B] If the secondary queue is not already created, the secondary queue is created with a lock
- [C] If secondary queue creation fails due to lock failure, spin write variable is attempted
baseCount
In case it works - [A] If A secondary queue has been created, the corresponding element of the secondary queue will be operated directly
Comments marked [A] the branch code is more, its main idea is that if by CAS or auxiliary lock operation in the queue an element fail, the first by calling ThreadLocalRandom. AdvanceProbe (h) in an element in A queue to continue operating, The success of the action is recorded in the temporary variable Collide. If the next operation fails again, the concurrency is large and needs to be expanded. If the length of the secondary queue exceeds the number of cpus, the number of threads that can run at the same time cannot exceed the number of cpus on the computer.
There are four details that still need to be noted in this process:
Details:
CounterCells are simply an array and therefore not thread-safe, so write operations on them need to be locked to ensure concurrency safety
Details of the two:
When locking, the author made a double-check. I read that some articles interpreted it as “double-check similar to singleton mode”, which is not correct. The reason why the author did this was discussed in the last article. First first check cellsBusy = = 0 is the basis of process go down, if cellsBusy = = 1, exit, directly take lock failure call h = ThreadLocalRandom. AdvanceProbe (h); Update h and try again. If cellsBusy == 0 succeeds, call CounterCell r = new CounterCell(x); Initialize a CounterCell to reduce the size of the critical region for spinlocks to improve concurrency performance
Details of the three:
Check if cellsBusy is 0 before locking. If cellsBusy is 1, the lock failed. Since reading volatile is less expensive than calling the UNSAFE CAS operation, there is no need to invoke the time-consuming CAS if reading cellsBusy already indicates a lock failure
Details of the four:
The change to cellsBusy from 0 to 1 calls CAS, but setting from 1 to 0 only uses assignment. This is because CAS guarantees that only one thread can reach this statement, so assignment can be used to change the value of cellsBusy.
sumCount
The first two methods divide the number of elements in ConcurrentHashMap into baseCount and secondary queues. The size() method is called by simply adding the values.
public int size(a) {
long n = sumCount();
return ((n < 0L)?0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount(a) {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if(as ! =null) {
for (int i = 0; i < as.length; ++i) {
if((a = as[i]) ! =null) sum += a.value; }}return sum;
}
Copy the code
Afterword.
Java 8’s improvements to ConcurrentHashMap are all in the right place in terms of code. In theory, the performance of ConcurrentHashMap should be much better than that of Java 7, but I haven’t found a benchmark that compares the two versions. If any readers have done the relevant verification or seen the relevant benchmark, welcome to communicate with me.