preface
In the previous article ConcurrentHashMap analysis (I) of the overall structure, we began with the overall structure of ConcurrentHashMap to gradually understand its data structure and the conversion relationship of each node. This article will cover another focus of ConcurrenHashMap: how to scale up in a high-concurrency environment, which will help us understand the idea of high-concurrency programming.
Due to my limited level, there may be flaws and mistakes in the analysis process, I hope you can point out, learn together, progress together.
Train of thought
In the previous look at ConcurrenHashMap, we found a field, nextTable, that acts as a temporary storage array for expansion:
/** * The next table to use; non-null only while resizing. */
private transient volatile Node<K,V>[] nextTable;
Copy the code
Since ConcurrentHashMap is scaled up in a manner similar to progressive data migration, multithreading is much more flexible for both old and new arrays. Let’s take a look at how it expands.
In the previous analysis of putVal method, there was a process of judging the linked list threshold. If the length of the linked list exceeded a certain threshold, the operation of converting to a red-black tree would be triggered. The specific code is as follows:
if(binCount ! =0) {
if (binCount >= TREEIFY_THRESHOLD)
// Lists are converted to red-black trees
treeifyBin(tab, i);
if(oldVal ! =null)
return oldVal;
break;
}
Copy the code
If the size of the hash array is smaller than MIN_TREEIFY_CAPACITY(64), the array will be doubled. The code is as follows:
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
Copy the code
Then we analyze the tryPersize method:
private final void tryPresize(int size) {
// Increase the array size to a power of 2
int c = (size >= (MAXIMUM_CAPACITY >>> 1))? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>>1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// If the original array is not initialized
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
// Start initializing the array
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// Check again if the array needs to be initialized
if (table == tab) {
// Initialize the array
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n]; table = nt; sc = n - (n >>>2); }}finally{ sizeCtl = sc; }}}// If the array has been expanded or the capacity is larger than MAXIMUM_CAPACITY, return it directly
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
// This is the main point, and you can see that there are two cases here
else if (tab == table) {
int rs = resizeStamp(n);
[sizeCtl=-(1+nThreads)]
if (sc < 0) {
Node<K,V>[] nt;
if((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs +1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// If no other thread is expanding, the current thread will expand by itself
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null); }}}Copy the code
In the third case, the core method transfer method is to be executed regardless of whether other threads are expanding. We can see that whether the current thread is the first expanding thread is determined by whether the second parameter of transfer is null. Next, we will analyze the core method transfer.
The principle of
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// Calculate the step size according to the CPU, that is, how many buckets per thread to copy from the original array
if ((stride = (NCPU > 1)? (n >>>3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
// First expansion
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
// Create a new array
Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n <<1];
nextTab = nt;
} catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
// Create the ForwardingNode node. Remember that it is used as the first node? Let's see what it does in the back, right
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// Set to true when a bucket migration is complete
boolean advance = true;
// Set this parameter to true after data migration is complete
boolean finishing = false;
// I represents the bucket index and bound represents the boundary
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
// Advance is false when the bucket migration is not complete or the entire migration is not complete
if (--i >= bound || finishing)
advance = false;
// transferIndex<=0 indicates that no hash bucket needs to be migrated, sets I to -1, and the thread is ready to exit
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// After the bucket is migrated, update transferIndex, and get the next batch of buckets to be migrated
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false; }}Exit the transfer method
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
If finishing=true, data migration is complete and the last thread completes the finishing work
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
SizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2) * SizeCtl = sizeCtl+1 * Each thread exiting the Transfer method will be set to sizeCtl = sizeCtl-1 * before exit, so when the last thread exits: * There must be sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2), i.e. (SC-2) == resizeStamp(n) << RESIZE_STAMP_SHIFT */
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// If it is not equal, it is not the last thread
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// The last thread checks again to see if all the data has been migrated successfully
finishing = advance = true; i = n; }}// CAS sets the new array to null if there is no data in the I index of the original array
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// If the hash of the array's I index is MOVED, a thread has already migrated it
else if ((fh = f.hash) == MOVED)
advance = true;
// Important: Migrate nodes
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// Here is the list structure migration
if (fh >= 0) {
// The code will be listed separately later
}
// Here is the migration of the TreeBin node
else if (f instanceof TreeBin) {
// The code will be listed separately later
}
}
}
}
}
}
Copy the code
In the previous code, we learned that ConcurrentHashMap by default divides all slots into buckets, performs a simultaneous data migration for each bucket, and it performs data migration based on (SC-2! = resizeStamp(n) << RESIZE_STAMP_SHIFT to determine whether it is the last thread in the migration process. Other threads exit the method if their work is complete. The last thread needs to assign data from nextTable to table and restore the values of nextTable and sizeCtl to their original values. Next, we will analyze the different migration modes of linked list nodes and red-black tree nodes.
Here is the code for the linked list node:
int runBit = fh & n; // Hash the first node of the slot in the old bucket
Node<K,V> lastRun = f;
for(Node<K,V> p = f.next; p ! =null; p = p.next) {
// b is either 0 or n
int b = p.hash & n;
if(b ! = runBit) { runBit = b;// lastRun records the last node where the runBit value changedlastRun = p; }}// Start building two linked lists
/ / stern interpolation
if (runBit == 0) {
ln = lastRun;
hn = null;
} else {
hn = lastRun;
ln = null;
}
// Divide the linked list into 2 sub-linked lists ln and hn
for(Node<K,V> p = f; p ! = lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln); // the ln list stores the index I position of the new bucket
setTabAt(nextTab, i + n, hn); // the hn list stores the index I +n position of the new bucket
setTabAt(tab, i, fwd); // Set the ForwardingNode placeholder
advance = true; // Indicates that the node of the current old bucket has been migrated
Copy the code
We know that the capacity of ConCurrentHashMap must be a power of 2. So fh & n is either 0 or n. Therefore, the linked list will be divided into two parts, the part with the result of 0 and the part with the result of N. In this way, the original length of the linked list can be divided into half after expansion.
As you can see from the above code, specifying a power of two not only makes hash as evenly distributed as possible, but also helps with node migration. Next, we analyze the migration process of red-black trees:
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
// Create two linked lists
// Node provides the next field and TreeNode provides the prev field to form a bidirectional list
for(Node<K,V> e = t.first; e ! =null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null.null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
elsehiTail.next = p; hiTail = p; ++hc; }}// Determine whether the two linked lists need to be converted to red black treesln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc ! =0)?newTreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc ! =0)?new TreeBin<K,V>(hi) : t;
// Set the slot and advance status
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
Copy the code
This is the expansion of ConcurrentHashMap. ConcurrentHashMap solves the index recalculation problem perfectly by using the power of two digits and bits. It splits the array into buckets and migrates the data in a concurrent way.