Major details:

  1. When will capacity expansion be triggered? What is the capacity expansion threshold?
  2. What about thread safety during capacity expansion?
  3. How do other threads sense the capacity expansion status and perform capacity expansion together?
  4. How to split tasks when multiple threads are added together? Is the task granularity smaller than the task granularity better?
  5. ConcurrentHashMap.get(key)Method is not locked, how to ensure that during the expansion process, other threadsget(key)Method to get the correct value without thread safety issues?

The addCount(long x, int Check) method will be called after putting new data into the map to calculate the capacity of the current map. When the capacity reaches the threshold, the expansion logic will be triggered.

Trigger expansion source code:

private final void addCount(long x, int check) {
        // Use Longadder's design ideas to count the current capacity of the map and reduce lock contention, as detailed in the following analysis
        CounterCell[] as; long b, s;
        if((as = counterCells) ! =null| |! U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null| |! (uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended);return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            SizeCtl specifies the current capacity. If the current capacity is larger than the threshold, capacity expansion will be triggered. If sizeCtl is positive before capacity expansion logic is triggered, it indicates the threshold for next capacity expansion.
            SizeCtl is a 32-bit int type. The highest 16 bits are the postmarks of capacity expansion, and the lower 16 bits are the number of threads running capacity expansion at the same time. When a thread enters capacity expansion, the sizeCtl value will be changed, as shown in the code below
            while (s >= (long)(sc = sizeCtl) && (tab = table) ! =null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                N is the length of the current array. If n is the same, the postmark must also be the same. See the following analysis for calculation logic
                int rs = resizeStamp(n);
                // If sc is smaller than 0, capacity expansion is in progress. The value of SC will be changed through the CAS when capacity expansion is triggered
                // Sc is a 32-bit int. The lower 16 bits record the number of threads currently performing capacity expansion. The higher 16 bits are postmarks
                if (sc < 0) {
                // (sc >>> RESIZE_STAMP_SHIFT) ! = RS Indicates that the expansion process is not the same
                / / sc = = rs + 1 | | sc = = rs + MAX_RESIZERS this condition is a bug, Should be written (sc > > > RESIZE_STAMP_SHIFT) = = rs + 1 | | (sc > > > RESIZE_STAMP_SHIFT) + MAX_RESIZERS = = rs
                / / in jdk12 has been repaired, oracle's website fix the link is https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427
                // (nt = nextTable) == null When there are threads to expand capacity, nextTable must be generated, so this table does not need to be processed
                // transferIndex <= 0 The transferIndex is the index of the bucket that has not been migrated. If the value is less than or equal to 0, the bucket has been migrated and no action is required
                    if((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs +1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                // After entering the capacity expansion process, the sizeCtl value will be +1. The lower 16 bits of sizeCtl indicate the number of concurrent capacity expansion threads
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                // If SC is larger than 0, the expansion logic is not entered. In this case, CAS is used to change the sizeCtl value to (RS << RESIZE_STAMP_SHIFT) + 2
                // The rs here is the capacity expansion postmark generated above, and the rs here is shifted 16 bits to the left, so that the lower 16 bits are used to record the number of concurrent capacity expansion threads, and the higher 16 bits are used to represent capacity expansion postmarks.
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null); s = sumCount(); }}}Copy the code
  • Capacity calculation logic:

If a ConcurrentHashMap uses a size to record the current capacity, then all threads will compete to modify this variable during the put process. The competition will be very fierce and the performance will be poor. So the idea of LongAdder is to reduce the granularity of locks. I maintain an array of longs, and when multiple threads make concurrent modifications, I select the longs that are not occupied in the array to add and subtract. Finally, when calculating the result, I add up the numbers in the array to be close, which improves the throughput several times and reduces the lock contention. When the size() method is called to obtain the current capacity, all the values in this array need to be added up. This code is as follows:

static final class CounterCell {
        volatile long value;
        CounterCell(longx) { value = x; }}Copy the code
  • Calculation logic of expansion postmark:
int rs = resizeStamp(n); 


static final int resizeStamp(int n) {
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }
Copy the code

Integer. NumberOfLeadingZeros (n) is to calculate the number of consecutive 0 n under the binary of high, such as n after converted to binary is 0000 0000 0000 1000 1111 0101 0110 1101, then the result is 12. Since the high order is 12 consecutive zeros and the value of RESIZE_STAMP_BITS in (1 << (resize_stamp_bits-1)) is 16, the final binary result is 1000 0000 0001 1100

Here are three key points:

  1. SizeCtl indicates the expansion threshold when the capacity expansion is not triggered. In this case, sizeCtl is a positive number. When the number of data in the map reaches the threshold, the expansion logic is triggered
  2. When a thread triggers the expansion, it will change the sizeCtl value through CAS. The logic of the modification is to move the generated expansion postmark 16 bits to the left and then +2. At this time, because the symbol bit is 1 (because the postmark algorithm determines that the symbol bit is 1 after the postmark is moved 16 bits to the left), the sizeCtl value must be a negative number. Because the CAS operation is performed, only one CAS thread succeeds and starts the capacity expansion process.
  3. When sizeCtl is negative, it indicates that other threads can expand capacity together during capacity expansion. Cas should first make sizeCtl+1, so as to determine the number of concurrent threads for capacity expansion through the lower 16 bits of sizeCtl, so as to determine which thread finishes capacity expansion last, and then do the final work. The finishing touches include pointing the current object’s table to the new table, resetting sizeCtl to a positive value indicating the expansion threshold, and so on

Take a look at the source code:

NextTab is an empty new table with twice the capacity
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        // stide stands for step length. The old table will be partitioned according to this step. In concurrent expansion, each thread is only responsible for the data transfer within its own segment
        int n = tab.length, stride;
        // NCPU is the number of cores in the current system. If there is only one kernel, then there is no need to perform concurrent expansion, because expansion is purely computationally intensive logic
        // The minimum step size is 16, which means that each thread is responsible for at least 16 bucket data migration. Setting this value too small will lead to an increase in the number of concurrent threads, resulting in greater competition between threads. This competition is only for the following CAS logic, such as CAS operations on transferIndex and sizeCtl variables
        if ((stride = (NCPU > 1)? (n >>>3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                // If the new table is not initialized, create a new table with double capacity
                Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n <<1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            // Set transferIndex to the length of the old table (1 larger than the last bucket index, so the following code will be -1).
            transferIndex = n;
        }
        int nextn = nextTab.length;
        // Generate a node that represents the expanding status
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        // The thread retrieves one bucket at a time from the array according to the step size. If there is still unprocessed data after processing the bucket, the thread needs to retrieve another bucket from the array
        // true indicates that the current thread needs to continue to truncate a bucket from the old table's array to process the data.
        boolean advance = true;
        // Mark whether expansion has been completed
        boolean finishing = false; // to ensure sweep before committing nextTab
        // I is the index of the bucket that the current thread needs to transfer, and bound is the minimum index of the bucket that the current thread is responsible for
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            // This logic is used to check whether all the buckets in the current thread are processed. If so, check whether there are any unprocessed buckets in the old table. If there are any unprocessed buckets in the old table, then cut the bucket again
            while (advance) {
                int nextIndex, nextBound;
                // If the subscript of the next bucket (-- I is the subscript of the next bucket) is still in the segment, there is no need to intercept the new segment, and proceed to the next bucket
                // There is no need to continue to intercept a new segment if it has already ended
                if (--i >= bound || finishing)
                    advance = false;
                // The transferIndex is used to indicate that the subscript and the buckets following it have been processed by other threads. The new thread needs to intercept the buckets it is responsible for from the transferIndex. If the transferIndex is less than or equal to 0, the buckets have been transferred and no further processing is required
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                // Take nextIndex (already assigned transferIndex above) as the starting position, cut the corresponding step to the array header to transfer the data, and set the transferIndex to the new index through CAS
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    // After the cas is successful, set the subscript bound for the current thread (e.g. buckets with subscripts 32 through 48)
                    bound = nextBound;
                    // After the cas is successful, set the subscript of the bucket that the current thread starts processing (e.g., buckets with subscripts 32 through 48).
                    // transferIndex takes the value from tab.length by default, minus 1 is required to indicate the correct subscript
                    i = nextIndex - 1;
                    // If the cas succeeds, the current thread has successfully intercepted the data for which it is responsible. No further interception is required
                    advance = false; }}// I is the index of the bucket to be transferred, and n is the capacity of the old table
            // I <0 indicates that all buckets in the old table have been transferred
            / / I > = n | | I + n > = nextn condition is not very understand this judgment, under normal circumstances, as I began to shift the barrels of subscript will certainly less than the capacity of class, because the transfer is of class inside the barrel
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                // Check whether capacity expansion has been completed. If capacity expansion has been completed, perform the end logic
                if (finishing) {
                    // After capacity expansion, set the reference to NULL
                    nextTable = null;
                    // Refer the table reference to the new table. Table is volatile, so the assignment is visible to other threads
                    table = nextTab;
                    // Set the capacity expansion threshold to 3/4 of the new capacity
                    // The size of the new table is 2n. The size of the new table is 2n-0.5n = 1.5n
                    sizeCtl = (n << 1) - (n >>> 1);
                    // The capacity expansion is complete
                    return;
                }
                // sizeCtl will be set to a negative value at the start of capacity expansion. Each time a new thread is added to capacity expansion, the sizeCtl will be set to +1, and when a thread has finished processing capacity expansion logic, it will be reduced by 1 to determine if it is the last thread
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                If the cas is successful, the current thread is the last thread to complete the expansion, and the last thread to complete the expansion logic sets finishing and advance to true, and then loops through the closing logic in if(finishing) above
                // The first thread that triggers capacity expansion is responsible for adding 2 to sc. Why does the first thread add 2 instead of 1
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit}}// If the bucket is empty, put a ForwardingNode in the corresponding index of the old table. If other threads write data to the empty bucket, they will sense the expansion process and expand the bucket together
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            // The hash value of the ForwardingNode node is MOVED (the ForwardingNode constructor will set the hash value to MOVED), indicating that a thread has already processed the data in the bucket
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
            // Lock buckets to prevent concurrent data operations on the same bucket
                synchronized (f) {
                // It's a bit like a double check lock to prevent the bucket from being inserted by another thread after the lock is acquired, because f is the node object that was acquired before the lock was acquired. In the meantime, new data may be inserted at the subscript
                // If another thread calls the put method to insert a new node into the bucket list, the node in the bucket becomes the newly inserted data node (the put operation generates a new node and points its next reference to the original node).
                // If this check is not performed, new data added to the bucket will not be processed, resulting in data loss
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        // fh>=0 indicates a normal list
                        if (fh >= 0) {
                        Hash (n-1) hash (n-1) hash (n-1) hash (n-1) hash (n-1) hash
                        // Nodes with a value of 0 are marked under I of the new table, while nodes with a value of n need to be moved to I +n of the new table because of double capacity
                        // When rehash is performed, half of the data in the bucket with index I is still in the bucket with index I and the other half is in the bucket with index I +n
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            // The purpose of this loop is twofold
                            // 1. The longest chain of the next pointer can be moved to a new bucket instead of traversing the end of the list
                            // 2. Since the old complete node chain is moved to the new bucket, there is no need to create new nodes, reducing gc pressure during migration
                            for(Node<K,V> p = f.next; p ! =null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for(Node<K,V> p = f; p ! = lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;
                                // Walk through the nodes one by one, 0 represents the list that is still placed in the bucket with subscript I
                                // A new node object is generated each time without modifying the next pointer to the original node object, which is why the get() method is unlocked
                                // But there is gc pressure, so there is the above traversal, hoping to reduce the creation of objects
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                // Otherwise, the list is placed in buckets with subscript I +n
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            // Set the index of the new table to I
                            setTabAt(nextTab, i, ln);
                            // Set the index of the new table to I +n
                            setTabAt(nextTab, i + n, hn);
                            // Add a ForwardingNode object to the bucket with subscript I in the old table to indicate that the capacity expansion process is under way
                            setTabAt(tab, i, fwd);
                            // After processing, set this field to true so that you can go to the while(advance) above to check that the data in the current thread is processed and to see if a new segment needs to be cut
                            advance = true;
                        }
                        // The logic is similar to that of the linked list. The whole tree is split into two, one tree is placed in the bucket with index I and the other tree is placed in the bucket with index I +n
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for(Node<K,V> e = t.first; e ! =null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null.null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    elsehiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc ! =0)?newTreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc ! =0)?new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }
Copy the code

See here to answer the above question:

  1. When will capacity expansion be triggered? What is the capacity expansion threshold?

Each time data is put into the map, the map recalculates the size. If the capacity expansion threshold is reached, the expansion logic is triggered. The expansion factor is 0.75, that is, when the capacity reaches three quarters of the total capacity, the capacity expansion is triggered

  1. What about thread safety during capacity expansion?

The thread safety scenarios are divided into the following:

  • Concurrent scenarios between expansion threads

During task allocation between expansion threads and expansion threads, the pointer is cut from the end of the array to the head in buckets and used to mark the allocated areatransferIndexisvolatileModified, so that threads are visible to each other, via CAStransferIndexValue to ensure that there are no duplicate buckets between threads

  • Expand the concurrency between thread and writer thread

There are two scenarios: 1. When triggering the expansion process, change the sizeCtl number from positive to negative through CAS and add +2. In this way, only one thread succeeds in CAS and other writer threads do not trigger the expansion process. Second, how to avoid the other writer threads to finished in expansion, the expansion of the pail to write data, because the thread is expanding list or B tree traversal the barrel to rehash, if to traverse the list or B tree insert new data, expanding the thread cannot be perceived, could lead to new without these data in the table, want to combine thisput(k,v)For empty buckets, both put and capacity expansion operations add data to and from the empty bucket through the CAS operation. Therefore, only one thread succeeds in writing to the empty bucket, and when the put thread fails or the capacity expansion thread fails, the value in the bucket will be retrieved. Then trigger the corresponding PUT or expansion logic again to avoid concurrency problems.put(k,v)The code screenshot is as follows:For a bucket with data, both put operations and capacity expansion operations passsynchronizedLock buckets to avoid concurrent write problems.

  • Expand the concurrency between threads and reader threads

This is answered in question number five

  1. How do other threads sense the capacity expansion status and perform capacity expansion together?

During capacity expansion of a bucket, one bucket is generated after capacity expansionForwardingNodePut it under the corresponding subscript of the old table. When other threads modify the data in the bucket, if they find nodes of this type, they will expand the capacity together.put(k,v)The code screenshot is as follows:

  1. How to split tasks when multiple threads are added together? Is the task granularity smaller than the task granularity better?

Expansion is a pure computing logic mission, so will be based on the kernel machine number to decide, if only one nuclear, by a thread to handle the line, then introducing multi-threaded expansion will introduce on-line switching overhead, at the same time in the source set each thread is responsible for the number of barrels at least 16, because the size is too small, For example, cas operations on transferIndex and sizeCtl variables. 5. Concurrenthashmap.get (key) methods are unlocked. Can the get(key) method of another thread get the correct value without thread safety problems? Is thread-safe scene, the get (key), after get the subscript arrays may have expanded, data is rehash, this time by the old subscript may take less than the value, it is to read and write in separating the ideas to solve, this separation, speaking, reading and writing in the process of data migration in barrel, migration data is completed, the barrel, the capacity to complete three stages can be reflected:

  • When transferring bucket data, do not move bucket data and do not modify the next pointer of bucket data. Instead, new node object is placed in the new table, so that the thread reading the data will not find the data when traversing the linked list because the next reference is changed.
  • After data migration is complete, add a table to the bucket of the original tableForwardingNodeNode, through this nodefind(k)Method can obtain the corresponding data;
  • After the entire repository is complete, assign the new table reference tovolatileTable, so that the action of updating references is visible to other threads; This ensures that the correct values are read in all three processes.