preface

In the previous article ConcurrentHashMap analysis (I) of the overall structure, we began with the overall structure of ConcurrentHashMap to gradually understand its data structure and the conversion relationship of each node. This article will cover another focus of ConcurrenHashMap: how to scale up in a high-concurrency environment, which will help us understand the idea of high-concurrency programming.

Due to my limited level, there may be flaws and mistakes in the analysis process, I hope you can point out, learn together, progress together.

Train of thought

In the previous look at ConcurrenHashMap, we found a field, nextTable, that acts as a temporary storage array for expansion:

/** * The next table to use; non-null only while resizing. */
private transient volatile Node<K,V>[] nextTable;
Copy the code

Since ConcurrentHashMap is scaled up in a manner similar to progressive data migration, multithreading is much more flexible for both old and new arrays. Let’s take a look at how it expands.

In the previous analysis of putVal method, there was a process of judging the linked list threshold. If the length of the linked list exceeded a certain threshold, the operation of converting to a red-black tree would be triggered. The specific code is as follows:

if(binCount ! =0) {
    if (binCount >= TREEIFY_THRESHOLD)
        // Lists are converted to red-black trees
        treeifyBin(tab, i);
    if(oldVal ! =null)
        return oldVal;
    break;
}
Copy the code

If the size of the hash array is smaller than MIN_TREEIFY_CAPACITY(64), the array will be doubled. The code is as follows:

if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
    tryPresize(n << 1);
Copy the code

Then we analyze the tryPersize method:

private final void tryPresize(int size) {
    // Increase the array size to a power of 2
    int c = (size >= (MAXIMUM_CAPACITY >>> 1))? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>>1) + 1);
    int sc;
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        // If the original array is not initialized
        if (tab == null || (n = tab.length) == 0) {
            n = (sc > c) ? sc : c;
            // Start initializing the array
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    // Check again if the array needs to be initialized
                    if (table == tab) {
                        // Initialize the array
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n]; table = nt; sc = n - (n >>>2); }}finally{ sizeCtl = sc; }}}// If the array has been expanded or the capacity is larger than MAXIMUM_CAPACITY, return it directly
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        // This is the main point, and you can see that there are two cases here
        else if (tab == table) {
            int rs = resizeStamp(n);
            [sizeCtl=-(1+nThreads)]
            if (sc < 0) {
                Node<K,V>[] nt;
                if((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs +1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            // If no other thread is expanding, the current thread will expand by itself
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null); }}}Copy the code

In the third case, the core method transfer method is to be executed regardless of whether other threads are expanding. We can see that whether the current thread is the first expanding thread is determined by whether the second parameter of transfer is null. Next, we will analyze the core method transfer.

The principle of

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // Calculate the step size according to the CPU, that is, how many buckets per thread to copy from the original array
    if ((stride = (NCPU > 1)? (n >>>3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE;
    // First expansion
    if (nextTab == null) {
        try {
            @SuppressWarnings("unchecked")
            // Create a new array
            Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n <<1];
            nextTab = nt;
        } catch (Throwable ex) {
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;
    // Create the ForwardingNode node. Remember that it is used as the first node? Let's see what it does in the back, right
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // Set to true when a bucket migration is complete
    boolean advance = true;
    // Set this parameter to true after data migration is complete
    boolean finishing = false;
    // I represents the bucket index and bound represents the boundary
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            // Advance is false when the bucket migration is not complete or the entire migration is not complete
            if (--i >= bound || finishing)
                advance = false;
            // transferIndex<=0 indicates that no hash bucket needs to be migrated, sets I to -1, and the thread is ready to exit
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // After the bucket is migrated, update transferIndex, and get the next batch of buckets to be migrated
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false; }}Exit the transfer method
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            If finishing=true, data migration is complete and the last thread completes the finishing work
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            SizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2) * SizeCtl = sizeCtl+1 * Each thread exiting the Transfer method will be set to sizeCtl = sizeCtl-1 * before exit, so when the last thread exits: * There must be sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2), i.e. (SC-2) == resizeStamp(n) << RESIZE_STAMP_SHIFT */
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // If it is not equal, it is not the last thread
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                // The last thread checks again to see if all the data has been migrated successfully
                finishing = advance = true; i = n; }}// CAS sets the new array to null if there is no data in the I index of the original array
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // If the hash of the array's I index is MOVED, a thread has already migrated it
        else if ((fh = f.hash) == MOVED)
            advance = true;
        // Important: Migrate nodes
        else {
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // Here is the list structure migration
                    if (fh >= 0) {
                        // The code will be listed separately later
                    }
                    // Here is the migration of the TreeBin node
                    else if (f instanceof TreeBin) {
                        // The code will be listed separately later
                    }
                }
            }
        }
    }
}
Copy the code

In the previous code, we learned that ConcurrentHashMap by default divides all slots into buckets, performs a simultaneous data migration for each bucket, and it performs data migration based on (SC-2! = resizeStamp(n) << RESIZE_STAMP_SHIFT to determine whether it is the last thread in the migration process. Other threads exit the method if their work is complete. The last thread needs to assign data from nextTable to table and restore the values of nextTable and sizeCtl to their original values. Next, we will analyze the different migration modes of linked list nodes and red-black tree nodes.

Here is the code for the linked list node:

int runBit = fh & n; // Hash the first node of the slot in the old bucket
Node<K,V> lastRun = f;
for(Node<K,V> p = f.next; p ! =null; p = p.next) {
    // b is either 0 or n
    int b = p.hash & n;
    if(b ! = runBit) { runBit = b;// lastRun records the last node where the runBit value changedlastRun = p; }}// Start building two linked lists
/ / stern interpolation
if (runBit == 0) {
    ln = lastRun;
    hn = null;
} else {
    hn = lastRun;
    ln = null;
}
// Divide the linked list into 2 sub-linked lists ln and hn
for(Node<K,V> p = f; p ! = lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;
    if ((ph & n) == 0)
        ln = new Node<K,V>(ph, pk, pv, ln);
    else
        hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln); // the ln list stores the index I position of the new bucket
setTabAt(nextTab, i + n, hn); // the hn list stores the index I +n position of the new bucket
setTabAt(tab, i, fwd); // Set the ForwardingNode placeholder
advance = true; // Indicates that the node of the current old bucket has been migrated
Copy the code

We know that the capacity of ConCurrentHashMap must be a power of 2. So fh & n is either 0 or n. Therefore, the linked list will be divided into two parts, the part with the result of 0 and the part with the result of N. In this way, the original length of the linked list can be divided into half after expansion.

As you can see from the above code, specifying a power of two not only makes hash as evenly distributed as possible, but also helps with node migration. Next, we analyze the migration process of red-black trees:

TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
// Create two linked lists
// Node provides the next field and TreeNode provides the prev field to form a bidirectional list
for(Node<K,V> e = t.first; e ! =null; e = e.next) {
    int h = e.hash;
    TreeNode<K,V> p = new TreeNode<K,V>
        (h, e.key, e.val, null.null);
    if ((h & n) == 0) {
        if ((p.prev = loTail) == null)
            lo = p;
        else
            loTail.next = p;
        loTail = p;
        ++lc;
    }
    else {
        if ((p.prev = hiTail) == null)
            hi = p;
        elsehiTail.next = p; hiTail = p; ++hc; }}// Determine whether the two linked lists need to be converted to red black treesln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc ! =0)?newTreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc ! =0)?new TreeBin<K,V>(hi) : t;

// Set the slot and advance status
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
Copy the code

This is the expansion of ConcurrentHashMap. ConcurrentHashMap solves the index recalculation problem perfectly by using the power of two digits and bits. It splits the array into buckets and migrates the data in a concurrent way.