ConcurrentHashMap source read

Slightly rough picture

  • ConcurrentHashMap is a Java packet that can be called a thread-safe HashMap.

  • As we all know,HashMap has good access performance, but it does not support a concurrent environment. HashTable supports a concurrent environment, and the performance deteriorates obviously when synchronization is added to the access method. Although synchronization has undergone a lot of optimization since JDK1.6, it is still not optimal .

  • ConcurrentHashMap locks each bucket in an array. In JDK1.7, a bucket is a segment. ConcurrentHashMap locks each bucket in an array In the 1.8 code, it is kept but very brief for compatibility only)

  • Only represent personal opinion, have error welcome message, thank you!


Member variables

1. transient volatile Node<K,V>[] table;
Copy the code

The Node array that actually stores the data. Volatile ensures visibility.

2. private transient volatile Node<K,V>[] nextTable;
Copy the code

The next array to be used is not empty only during expansion updates, and data is slowly moved to this array during expansion.

The array is not accessible outside of the class as the expansion is over

3. private transient volatile long baseCount;
Copy the code

Count elements when no contention occurs

4. private transient volatile int transferIndex;
Copy the code

Capacity expansion index: indicates the table array index position allocated to the capacity expansion thread. It is used to coordinate the concurrency security of migration tasks between multiple threads.

private transient volatile int sizeCtl;
Copy the code

As important as AQS, state is a race variable shared between multiple threads, used to maintain various states and save various information.

  • SizeCtl > 0 can be divided into two cases:

    • When not initialized,sizeCtlRepresents the initial capacity.
    • After initialization, it indicates the expansion threshold, which is the length of the current array, length x 0.75
  • SizeCtl = -1: The system is in the initialization or expansion phase.

  • SizeCtl < -1: sizeCtl takes care of the storage of the identifiers (high 16 bits) and the number of threads (low 16 bits) in capacity expansion

    • In the method code of addCount and helpTransfer, CAS is replaced with sizeCtl+1 if help is needed to expand

    • When the current capacity expansion is complete and no area is reallocated, the thread exits capacity expansion and the CAS is replaced bysizeCtl-1

Node
  • The node class that makes up the linked list elements and holds K/V key-value pairs
 static class Node<K.V> implements Map.Entry<K.V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
        public final K getKey(a)       { return key; }
        public final V getValue(a)     { return val; }
        public final int hashCode(a)   { return key.hashCode() ^ val.hashCode(); }
        public final String toString(a){ return key + "=" + val; }
        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }
        public final boolean equals(Object o) { Object k, v, u; Map.Entry<? ,? > e;return ((o instanceofMap.Entry) && (k = (e = (Map.Entry<? ,? >)o).getKey()) ! =null&& (v = e.getValue()) ! =null &&
                    (k == key || k.equals(key)) &&
                    (v == (u = val) || v.equals(u)));
        }
      	/** * Virtualized support for map.get(); Overridden in subclasses. * Virtualization support for map.get(); Override. */ in subclasses
        Node<K,V> find(int h, Object k) {
            Node<K,V> e = this;
            if(k ! =null) {
                do {
                    // Just loop through the list until you get the corresponding value
                    K ek;
                    if(e.hash == h && ((ek = e.key) == k || (ek ! =null && k.equals(ek))))
                        return e;
                } while((e = e.next) ! =null);
            }
            return null; }}Copy the code
  • andConcurrentHashMap.HashMapIn theNodeThe difference between
    • valandnextusevolatileKeyword modifier to ensure visibility between multiple threads.
    • hashCodeThe method is slightly different becauseConcurrentHashMapDoes not supportkeyorvalueIs NULL, so use it directlykey.hashCode() ^ val.hashCode()Skipped the null judgment.
  • find()The getelement () method is used to help get elements behind a node during a specific period of time. Usually used as the head node of a bucket to query elements in the bucket.
ForwardingNode
  • The forwarding nodes

  • This class only exists in the expansion phase, is placed first in the bucket as a marker node, and points to nextTable.

  • As you can see from the constructor,ForwardingNode has a hash of -1 and nothing else, making it a completely helper class.

static final class ForwardingNode<K.V> extends Node<K.V> {
        final Node<K,V>[] nextTable;
    
    	// The constructor defaults to 'MOVED:-1' as a Hash, the others as empty
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null.null.null);
            this.nextTable = tab;
        }
    	// Help with element lookup during expansion
        Node<K,V> find(int h, Object k) {
            // loop to avoid arbitrarily deep recursion on forwarding nodes
            outer: for (Node<K,V>[] tab = nextTable;;) {
                Node<K,V> e; int n;
                if (k == null || tab == null || (n = tab.length) == 0 ||
                    (e = tabAt(tab, (n - 1) & h)) == null)
                    return null;
                for (;;) {
                    int eh; K ek;
                    if((eh = e.hash) == h && ((ek = e.key) == k || (ek ! =null && k.equals(ek))))
                        return e;
                    if (eh < 0) {
                        if (e instanceof ForwardingNode) {
                            tab = ((ForwardingNode<K,V>)e).nextTable;
                            continue outer;
                        }
                        else
                            return e.find(h, k);
                    }
                    if ((e = e.next) == null)
                        return null; }}}}Copy the code
  • find()Methods Help during capacity expansiongetMethod to get the elements in the bucket.

Element addition method

 	public V put(K key, V value) {
        return putVal(key, value, false);
    }
Copy the code
  • By convention, the most exposed methods are the logical implementation methods that are invoked directly.
PutVal Specifies the logical method of storing
    /** * Method arguments: * 1. Key,value (k/v) * 2. OnlyIfAbsent (true) overwrites * only when the value is null
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        // CHM does not support NULL.
        if (key == null || value == null) throw new NullPointerException();
        // Get the Hash of the key,spread can be called a perturbation function
        int hash = spread(key.hashCode());
        int binCount = 0;
        // Infinite loop
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // Initializes the Table when TAB is empty
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            // Use '(n-1)&hash' to determine the subscript position of the element and obtain the corresponding node
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // If the corresponding node is empty, the current information is directly used as the head node of the bucket
                if (casTabAt(tab, i, null.new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // If the Hash of the bucket's head is MOVED, the node is' ForwardingNode '
            // The array is being expanded
            else if ((fh = f.hash) == MOVED)
                // Help expand capacity
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                // Locking guarantees atomicity; volatile only guarantees visibility
                // f is the node element obtained by key, which is the lock object
                synchronized (f) {
                    TabAt (TAB, I) = 'tabAt(TAB, I)'
                    // Here is the check again to see if it has been modified
                    if (tabAt(tab, i) == f) {
                        // Compare with else
                        // fh >= 0 indicates that the current node is a linked list node.
                        if (fh >= 0) {
                            // Count the number of elements in the list
                            binCount = 1;
                            // Loop through the bucket
                            // Two cases of breaking out of the loop:
                            // 1. Find the same value. BinCount indicates the number of nodes traversed
                            // 2. At the end of the loop,binCount indicates the number of nodes in the bucket
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // The source code makes extensive use of the expression short circuit feature, to show the priority of the judgment
                                // 1. If hash is not equal, skip judgment
                                // 2. After hash is equal, if the address of key is the same, enter if directly
                                // 3
                                if(e.hash == hash && ((ek = e.key) == key || (ek ! =null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    // onlyIfAbsent Is true, indicating that the content is not overwritten
                                    if(! onlyIfAbsent) e.val = value;// The specified element has been found
                                    break;
                                }
                                // Because e is in the synchronized code block, the bucket is locked, and no other thread can change
                                // So there is no need to retrieve
                                Node<K,V> pred = e;
                                // 1. If e is empty, attach the element directly to e and break the loop
                                // 2. E is not empty, continue traversing
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break; }}}// Like HashMap, tree nodes operate independently.
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) ! =null) {
                                oldVal = p.val;
                                if(! onlyIfAbsent) p.val = value; }}}}// Enter the synchronization expression above, after modifying the bucket
                if(binCount ! =0) {
                    // If binCount is greater than the critical value of the tree, convert the list to a red-black tree
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    // If oldVal is empty, return
                    if(oldVal ! =null)
                        return oldVal;
                    break; }}}// Add the element count and check if it needs to be expanded when binCount is greater than 0
        addCount(1L, binCount);
        return null;
    }
Copy the code
The whole new process
  1. ConcurrentHashMap does not support keys or values that are null.

  2. The hash is disturbed, the TAB array is iterated, and the array is initialized if empty

  3. Obtain the index of the bucket using the (n-1) & hash formula. If the bucket is empty, fill in the key and value as the head node of the bucket

  4. Check the hash of the head node of the bucket. If hash == -1, the array is expanding and helps expand.

  5. If the hash of the head node of the bucket is greater than 0, the bucket is a linked list. This is followed by the normal list traversal, addition, or overwriting.

  6. If the head node of a bucket is of the type TreeBin, the structure of the bucket is a red-black tree.

  7. Exit the synchronization code block to determine whether the binCount counted during traversal needs to be converted to a red-black tree structure.

  8. Check whether oldVal is empty. This step is also very important. If oldVal is not empty, it will overwrite the operation.

  9. If oldVal does not add the number of elements for the air conditioner by using the addCount method, check whether the air conditioner needs to be expanded.

Element fetching method

   public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
       	// Get hash and perturb it
        int h = spread(key.hashCode());
       	// Determine to enter the fetch method
       	// 1. The array is not empty & the array length is greater than 0
        // 2. The obtained bucket is not empty
        if((tab = table) ! =null && (n = tab.length) > 0 &&
            // The formula for retrieving bucket subscripts is the same as' (n-1) & h '
            (e = tabAt(tab, (n - 1) & h)) ! =null)
        {// For the hash of the head node in the bucket, success does not require traversing the entire list
            if ((eh = e.hash) == h) {
                // Returns the matching element value
                if((ek = e.key) == key || (ek ! =null && key.equals(ek)))
                    return e.val;
            }
            // There are three cases where the element hash < 0:
            // 1. The array is expanding. The actual type of Node is ForwardingNode
            // 2. The node is the root node of the tree, TreeNode
            // 3. Temporarily reserved Hash, Node
            // Each Node calls its own find() method
            else if (eh < 0)
                return(p = e.find(h, key)) ! =null ? p.val : null;
            // If the head node is not required and the Map is not expanded
        	// Directly traverse the bucket to find elements
            while((e = e.next) ! =null) {
                if(e.hash == h && ((ek = e.key) == key || (ek ! =null && key.equals(ek))))
                    returne.val; }}return null;
    }
Copy the code
The complete acquisition process is as follows:
  1. It is obtained by perturbation functionkeyHash, which determines whether the TAB is empty and its length before fetching
  2. through(n -1)& hashObtained Buckets Obtain buckets in the following table.
  3. judgekeyHash is the same as the head node of the bucket.
  4. If the bucket head node is obtainedhash < 0Said,In the following three states, by calling their actual node typesfindMethod to get the element.
    1. The array is expanding and the actual type of Node isForwardingNode
    2. The node is the root node in the tree and the node type isTreeNode
    3. Hash, Node for the time being
  5. If the hash is not equal and the head hash is normal, then a normal list traversal lookup is performed.

Expansion mechanism

  • I have to say, the code for the expansion part is absolutely superb!!
AddCount Monitors capacity expansion
  • addCountThe role of:
    1. increaseConcurrentHashMapElement count of
    2. The precursor detects whether expansion is needed.
/** * parameters: * x -> Number of elements to be added * check -> If check<0, expansion is required, */
private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
     	// 1. CounterCells is not empty
      	// 2. The CAS succeeds in modifying the baseCount attribute
        if((as = counterCells) ! =null ||
            // CAS adds baseCOunt! U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            // Thread contention status flag
            boolean uncontended = true;
            // 1. Count cell is null, or the length is less than 1
            // 2. Randomly go to an array position that is empty
            // 3. The CAS fails to replace the value of the CounterCell
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            // CAS fails to increment the value of CounterCell and calls the fullAddCount method! (uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended);return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
    	// Check whether expansion needs to be checked according to 'check >= 0'
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            // 1. If the number of elements is larger than sizeCtl, the capacity expansion threshold is reached
            // 2. The TAB array cannot be empty, it is already initialized
            Table. Length is smaller than the maximum capacity, and the capacity has been expanded
            while (s >= (long)(sc = sizeCtl) && (tab = table) ! =null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                // Get an expansion flag based on the array length
                int rs = resizeStamp(n);
                if (sc < 0) {
                    // If the lower 16 bits of sc are not rs, the identifier has changed
                    // If nextTable is empty, expansion is complete
                    if((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs +1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    // Replace the CAS value with SC +1. If the CAS value is successfully added, capacity expansion starts
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        // Call Transfer to start expanding with nextTable already specified
                        transfer(tab, nt);
                }
                // 'sc > 0' indicates that the array is not in the expansion phase at this time, update the sizeCtl and start to expand
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    // Call Transfer,nextTable to be generated
                    transfer(tab, null); s = sumCount(); }}}Copy the code
HelpTransfer helps expand capacity
 /** * parameters: * TAB -> array, usually table * f -> thread lock corresponding to the bucket head node * where: * 1
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        // 1. The parameter array cannot be empty
		// 2. The f parameter must be of type ForwardingNode
        // 3.f. extTab cannot be empty
        if(tab ! =null && (f instanceofForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) ! =null) {
            // resizeStamp a bit operation hit my head
            // Obtain the capacity expansion identifier
            int rs = resizeStamp(tab.length);
            // Map is still in expansion state
            // 1. Check whether the nextTable of node F is the same as that of member variables
            // 2. Check whether the passed TAB is the same as the table of the member variable
            // check whether sizeCtl is smaller than 0
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                // Two different case judgments
                // No need for expansion
                // 1. The high 16 bits of sc do not equal rs
                // 2. Sc = rs+1
                // 3. Sc equals RS +MAX_RESIZERS
                // 4. TransferIndex <= 0;
                // If the value is less than 0, the area of the array is allocated
                if((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs +1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                CAS 'sc+1' calls transfer to help expand capacity.
                // Thread sizeCtl+1 when it helps with capacity expansion, -1 when it finishes, indicating flags
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break; }}return nextTab;
        }
        return table;
    }
Copy the code
Transfer The core method of expansion, which is responsible for migrating elements in the bucket
  private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
      	// Stride is the number of buckets to be migrated this time
      	// NCPU indicates the number of cpus on the host
      	// MIN_TRANSFER_STRIDE is the minimum number of groups processed by each thread
      	In multi-core, the stride is 1/8 of the current capacity and the number of cpus is rounded. For example, when the capacity is 16 and the CPU is 2, the result is 1
      	// 2. If the stride is n in single-core, it is the current array capacity
 		/ /!!!!!! The minimum stride is 16, which is restricted to death.
        if ((stride = (NCPU > 1)? (n >>>3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
      	// nextTab is a transition object for expansion, so it must be initialized first
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                / /!!!!!! The important point is that the enlarged size is double the current size --> n << 1
                Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n <<1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
            	// Capacity expansion failed, directly fill the maximum value of int
                sizeCtl = Integer.MAX_VALUE;
                // Exit directly
                return;	
           }
            // Update member variables
            nextTable = nextTab;
            // transferIndex is the length of the array
            transferIndex = n;
        }
      	// Record the length of the transition array
        int nextn = nextTab.length;
    	// Create a new ForwardingNode for subsequent placeholders
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        /** * This is the data preparation part, initializing the transition array, recording the length, creating the fill node, etc. ** the main logic for real expansion */
 		// This variable controls the migration process,
        boolean advance = true;
        boolean finishing = false; 			// Two variables whose function is unknown finishing may be this expansion flag
  // The expanded for loop can be divided into two parts
 The while determines the region of the bucket to be migrated and the subscript of the bucket to be migrated this time
      	// This I is the subscript of the bucket to be migrated
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
          	// The while block is based on the sequential function of if
            // -- I: is responsible for the forward recommendation of the migration area. I is the bucket subscript
            NextIndex: If the responsible area is not obtained, check whether expansion is required
            // CAS: is responsible for obtaining the area of the for loop, each time is the stride bucket
            while (advance) {
                int nextIndex, nextBound;
                // This' -- I 'will proceed each time, advancing one position each time
                if (--i >= bound || finishing)
                    advance = false;
                // Therefore, if transferIndex<=0, the expanded area is allocated
                else if ((nextIndex = transferIndex) <= 0) {
            		i = -1;
                    advance = false;
                // CAS replaces the value of transferIndex. The new value is the old value minus the obtained stride
                // Stride represents the migration area, and nextIndex represents the next starting point
                // From here you can see that the expansion is pushed forward from the end of the array
                }else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    // Bount is the end of the expansion and the starting point of the next expansion
                    bound = nextBound;
                    // the following table shows the buckets where the capacity expansion starts
                    i = nextIndex - 1;
                    advance = false; }}// The logical code for capacity expansion
        // 1. This if determines the result of expansion, with three outliers in the middle
              // 1). I < 0 when the second if above jumped thread
          	  // 2). I > Length of the old array
           	  // 3). I +n is greater than the new array length
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                // Perform operations after capacity expansion
                // 1. Make nextTable empty,
                // 2. Assign the intermediate array to table
                // 3. SizeCtl changed to 1.5 times (2n-0.5n)
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    // Use signed left shift and unsigned right shift respectively
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                // CAS replaces' sizectl-1 ', indicating that the capacity expansion task of this thread is complete
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    // If this expression is true, another thread is performing capacity expansion
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    // If the expression is valid, capacity expansion is complete.
                    finishing = advance = true;
                    // Check again before submittingi = n; }}// 2. Use ForwardingNode to fill empty buckets in the responsible area
            // ForwardingNode holds references to nextTable
            else if ((f = tabAt(tab, i)) == null)
                / / replace the CAS
                advance = casTabAt(tab, i, null, fwd);
       // 3. The processing is complete
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
      // 4. Migrate buckets
            else {
                Sync ensures atomicity and visibility
                synchronized (f) {                
                    // Get the head node of the i-th bucket in the array
                    // Re-judge after synchronized to ensure that the correctness of data is not modified in the middle
                    if (tabAt(tab, i) == f) {
                        // This extension is similar to HashMap, with lowNode and highNode headers
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for(Node<K,V> p = f.next; p ! =null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for(Node<K,V> p = f; p ! = lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                           	// if it is true, it will be renewed
                            advance = true;
                        }
                        // Tree bucket migration operation
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for(Node<K,V> e = t.first; e ! =null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null.null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    elsehiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc ! =0)?newTreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc ! =0)?new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

Copy the code
Capacity expansion triggers thread logic
  1. In the addCount method, check whether the number of elements reaches the capacity expansion threshold (0.75 * table.length). If the number exceeds the threshold, the capacity expansion is triggered and the transfer method is called.

    • Note that this timesizeCtlWill beCASReplace with(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2
  2. Here’s the teansfer code:

    1. According to the CPU and the current capacity, the allocated area size is calculated for each expansion. The minimum is 16, which represents the stride.

    2. If the transition array nextTab is not initialized, the array is initialized first. The transferIndex is used to record the length of the old array as the expansion length.

    3. Data required for capacity expansion begins:

    4. In a while loop for the expansion includes the scope of the barrel is the range of [transferIndex transferIndex – stride], I said that the current expansion of the barrels of subscript.

    5. Three judgments, four pieces of code to perform operations in different situations

      1. Numerical abnormalities, I < 0 | | > = n | | + n > nextn, expansion process is complete, and there is no distribution in the while loop expansion tasks.

        • If the finishing parameter is true, the expansion is complete and the check before the end is complete.

        • If finishing is false, **CAS replaces sizeCtl with sizeCtl-1**, indicating that a thread has completed capacity expansion and needs to exit.

          After the replacement, the system checks whether sc is equal to the value of addCount. If sc is not equal to the value of addCount, the system returns directly, indicating that there are threads that have not completed the expansion task.

      2. If the bucket corresponding to I is empty and the head node is directly filled with ForwardingNode, capacity expansion is under way. And set advance to true

      3. If the hash value is Moved, the current node is ForwardingNode, and advance is true.

      4. Excluding the above three cases, the corresponding bucket migration work is similar to a HashMap. Set advance to true

    6. And then we’ll go back to step 4.

Expand slave thread logic
  1. inputValElement operation method, the bucket head node obtained isForwdingNodeIt meansConcurrentHashMapThe current capacity is being expanded and will be invoked immediatelyhelpTransferHelp with capacity expansion.
  2. helpTransferThere will be a variety of correct judgment, only when the following three conditions are met, the capacity will be expanded.
    1. tabIs not empty
    2. Whether the head node isForwardingNode
    3. Transition arraynextTableWhether to initialize.
  3. There are two kinds of judgments in the while loop
    1. Determine whether you need help during capacity expansion. In the following five cases, no help is required
      1. sc >> 16 ! = rs– The identifier has changed.
      2. sc == rs+1– The thread triggering capacity expansion exits and capacity expansion is complete
      3. sc == rs+MAX_RESIZER– The number of threads participating in capacity expansion reaches the maximum
      4. transferIndex <= 0– The expansion area has been allocated
    2. It will be called if the above cases do not require helptransferHelp with capacity expansion.
SizeCtl changes during capacity expansion
  1. addCount -> sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2

    • Here’s a point I’ve been trying to figure out for a long time: WhyhelpTransferThere will be judgmentsizeCtlHigh 16 bit operation,
    • So when I do this assignment, it’s the same thing as takingresizeStamp(n)The value of is pushed up 16 bits and assigned tosizeCtlThe lower 16 bits save the 2. That is, during capacity expansionsizeCtlThe lower 16 bits hold the identifier, while the lower 16 bits hold the number of participating threads.
    • He’s a fucking genius.

  2. -> sizeCtl = sizeCtl – 1

  3. SizeCtl = sizeCtl + 1

  4. SizeCtl = nexttab. length * 0.75 After capacity expansion is complete

Initialization method

  • andHashMapThe same,ConcurrentHashMapInstead of initializing the underlying array directly in the constructor, theputIn the equal-storage method, determine whether capacity expansion is required.
InitTable array initialization function
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // sizeCtl indicates that another array is being initialized, freeing up CPU time
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        // the CAS operation replaces the value of 'sizeCtl' with -1
        SizeCtl ==-1 indicates that the array is being initialized on a thread
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                // It is necessary to recheck if the array is uninitialized after the substitution
                if ((tab = table) == null || tab.length == 0) {
                    // sc is sizeCtl before permutation.
                    // use sizeCtl as the initial capacity.
                    int n = (sc > 0)? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n]; table = tab = nt;// After the initialization, sc becomes 0.75 N, which is the capacity expansion threshold
                    sc = n - (n >>> 2); }}finally {
                // In case sizeCtl is permanently -1 due to abnormal exit, the value is forcibly assigned here.
                sizeCtl = sc;
            }
            break; }}// Returns the address of the new array
    return tab;
}
Copy the code
  • initTableMethod inputValIt’s not a constructorCHMA lazy loading mechanism in.
  • Initialization process:
    1. Check if there are other threads initializing, if there is, release the time slice, if there is not, proceed to the next step.
    2. Before you initialize it, take thesizeCtlthroughCASA substitution of -1 indicates initialization
    3. In order tosizeCtlThe previous value is the initial capacity,sizeCtlWhen <=0, use the default capacity 16
    4. SizeCtl = 0.75* array size (sizeCtl throughout, really important)

General tool method

1. Obtain a mark for the expansion of resizeStamp
    static final int resizeStamp(int n) {
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }
Copy the code
  • Integer.numberOfLeadingZeros(n) Returns the number of zeros before the 32-bit binary form of n, for example, value 16Int (32)The binary representation of type is000000... 00100001 has 27 0’s in front of it, so it returns 27.
  • |Operations can now be understood here simply as addition.
  • That’s what it does: get the number of zeros before n’s significant bits plus 1 to the 15th power.
  • I am not clear why we need to obtain a Stamp
2. Spread disturbance function
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}
Copy the code
  • The perturbation function, andHashMapIn thehash()Methods function similarly.
  • CHMIn addition to placing high 16 at low 16 bits xor and upper HASH_BITS,It can effectively reduce the probability of hash conflict and make the elements more evenly dispersed.

Node array element access method

1. TabAt retrieves array elements Volatile
    @SuppressWarnings("unchecked")
	// TAB: array I: subscript
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
Copy the code
2. CasTabAt toCASThe form replaces an array element
// TAB: original array I: subscript C: compare elements v: replace elements
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
Copy the code
3. SetTabAt updates array elements volatile
// TAB: original array I: subscript v: replacement element
	static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }
Copy the code

The Unsafe static block

  • UnsafeIs an area that Java developers rarely touch, but here’s a quick overview
    private static final sun.misc.Unsafe U;
	// The offset address of the sizeCtl attribute
    private static final long SIZECTL;
	// The offset address of the transferIndex attribute
    private static final long TRANSFERINDEX;
	// baseCount offset address
    private static final long BASECOUNT;
	// cellsBusy offset address
    private static final long CELLSBUSY;
	// Offset address of value in the CounterCell class
    private static final long CELLVALUE;
	// The offset address of the first element of the Node array
    private static final long ABASE;
	// The incremental address of the element in the Node array, used in conjunction with ABASE to access the elements of the array
    private static final int ASHIFT;

    static {
        try{ U = sun.misc.Unsafe.getUnsafe(); Class<? > k = ConcurrentHashMap.class;// The attribute value is obtained through reflection, and the offset address of the attribute is obtained through the Unsafe class
            SIZECTL = U.objectFieldOffset
                (k.getDeclaredField("sizeCtl"));
            TRANSFERINDEX = U.objectFieldOffset
                (k.getDeclaredField("transferIndex"));
            BASECOUNT = U.objectFieldOffset
                (k.getDeclaredField("baseCount"));
            CELLSBUSY = U.objectFieldOffset
                (k.getDeclaredField("cellsBusy")); Class<? > ck = CounterCell.class; CELLVALUE = U.objectFieldOffset (ck.getDeclaredField("value")); Class<? > ak = Node[].class;// Get the offset address of the first element in the array
            ABASE = U.arrayBaseOffset(ak);
            // Get the incremental address of the array
            int scale = U.arrayIndexScale(ak);
            if ((scale & (scale - 1)) != 0)
                throw new Error("data type scale not a power of two");
            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
        } catch (Exception e) {
            throw newError(e); }}Copy the code