As we know, ConcurrentHashMap is thread-safe compared with HashMap. Its principle is mainly to modify the corresponding variables through CAS, while synchronized locks some major logic during concurrency. The object of the lock is the first element of the corresponding slot in the array (that is, the whole linked list that locks the corresponding slot). Below, we will analyze the principle of thread safety. It is recommended that you read part 8 of this series on the implementation of HashMap before reading this article. It provides a general introduction to the storage logic of the Map structure. This part focuses on sorting out the key methods for dealing with concurrency.

A, structure,

public class ConcurrentHashMap<K.V> extends AbstractMap<K.V>
    implements ConcurrentMap<K.V>, Serializable {
Copy the code

You can see that it inherits from the AbstractMap class and implements the ConcurrentMap interface (which is relatively unremarkable).

2. Construction method

public ConcurrentHashMap(a) {}Copy the code
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1))? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>>1) + 1));
    this.sizeCtl = cap;
}
Copy the code
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}
Copy the code
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if(! (loadFactor >0.0 f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    long size = (long) (1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}
Copy the code

So you can see that this is the initialization of the capacity expansion threshold sizeCtl, and tableSizeFor, which we’ve sorted out in HashMap, is used to compute the carry.

Three, variables,

1, MAXIMUM_CAPACITY

private static final int MAXIMUM_CAPACITY = 1 << 30;
Copy the code

Maximum capacity.

2, DEFAULT_CAPACITY

private static final int DEFAULT_CAPACITY = 16;
Copy the code

Default capacity.

3, MAX_ARRAY_SIZE

static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
Copy the code

Maximum array length.

4, DEFAULT_CONCURRENCY_LEVEL

private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
Copy the code

The default concurrency level is now not used (1.8&+).

5, LOAD_FACTOR

Private static final float LOAD_FACTOR = 0.75f;Copy the code

The capacity expansion load factor is equivalent to this field in the HashMap.

6, version

static final int MOVED     = -1; // hash for forwarding nodes
static final int TREEBIN   = -2; // hash for roots of trees
static final int RESERVED  = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
Copy the code

MOVED: Capacity expansion is being performed.

7, NCPU

static final int NCPU = Runtime.getRuntime().availableProcessors();
Copy the code

Number of CPU cores in the current system.

This leaves out some variables related to tree structure.

8, table

transient volatile Node<K,V>[] table;
Copy the code

An array of nodes similar to a HashMap, but preceded by volatile to sense its changes in real time.

9 nextTable.

/** * The next table to use; non-null only while resizing. */
private transient volatile Node<K,V>[] nextTable;
Copy the code

The next array, this is for concurrent expansion.

10, baseCount

private transient volatile long baseCount;
Copy the code

Used to count how many elements are in the current Map.

11, sizeCtl

private transient volatile int sizeCtl;
Copy the code

Control of the expansion of an array to expand the capacity of the elements that reach this value. This field also has other meanings. < 0: if -1 indicates expansion, other smaller (1 + is adjusting the number of threads to expand), but seems to have a problem with the JDK source comment (or translation problem?). “, which is actually 16 bits back to make sense.

12, transferIndex

private transient volatile int transferIndex;
Copy the code

We know that ConcurrentHashMap is designed to handle concurrency, so it also uses segmentation when expanding, using multiple threads (if there is concurrency) to transfer the corresponding segment to the new array (which is backward). This transferIndex indicates where the next thread to be expanded should start.

13, cellsBusy

/** * Spinlock (locked via CAS) used when resizing and/or creating CounterCells. */
private transient volatile int cellsBusy;
Copy the code

This is from spin operation in expansion.

14, CounterCell

private transient volatile CounterCell[] counterCells;
Copy the code
@jdk.internal.vm.annotation.Contended static final class CounterCell {
    volatile long value;
    CounterCell(longx) { value = x; }}Copy the code

This is to show that the scaling calculations are handled by multithreading. Here @ JDK. Internal. Vm. The annotation. Contended, is the concept of a cache line, can search for other posts. Value is used for calculation.

15, MAX_RESIZERS

/** * The maximum number of threads that can help resize. * Must fit in 32 - RESIZE_STAMP_BITS bits. */
private static final int MAX_RESIZERS = (1< < (32 - RESIZE_STAMP_BITS)) - 1;
Copy the code

Maximum number of threads that can help with capacity expansion.

16, RESIZE_STAMP_BITS & RESIZE_STAMP_SHIFT

private static final int RESIZE_STAMP_BITS = 16;
Copy the code
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
Copy the code

Both have a value of 16, but they are used in different scenarios. This needs to be combed with sizeCtl. RESIZE_STAMP_BITS is left shift, RESIZE_STAMP_SHIFT is right shift.

17. Variables related to CAS operations

private static final Unsafe U = Unsafe.getUnsafe();
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final int ABASE;
private static final int ASHIFT;

static {
    try {
        SIZECTL = U.objectFieldOffset
            (ConcurrentHashMap.class.getDeclaredField("sizeCtl"));
        TRANSFERINDEX = U.objectFieldOffset
            (ConcurrentHashMap.class.getDeclaredField("transferIndex"));
        BASECOUNT = U.objectFieldOffset
            (ConcurrentHashMap.class.getDeclaredField("baseCount"));
        CELLSBUSY = U.objectFieldOffset
            (ConcurrentHashMap.class.getDeclaredField("cellsBusy"));

        CELLVALUE = U.objectFieldOffset
            (CounterCell.class.getDeclaredField("value"));

        ABASE = U.arrayBaseOffset(Node[].class);
        int scale = U.arrayIndexScale(Node[].class);
        if ((scale & (scale - 1)) != 0)
            throw new Error("array index scale not a power of two");
        ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
    } catch (ReflectiveOperationException e) {
        throw new Error(e);
    }

    // Reduce the risk of rare disastrous classloading in first call to
    // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773Class<? > ensureLoaded = LockSupport.class; }Copy the code

As you can see, CAS atomic manipulation is done through the Unsafe class.

Inner class

1, the Node

static class Node<K.V> implements Map.Entry<K.V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;

    Node(int hash, K key, V val) {
        this.hash = hash;
        this.key = key;
        this.val = val;
    }
Copy the code

This is the same as the structure and variable definition of a HashMap, except that valatile is added before val and next to handle concurrency. If there is concurrency, the value of valatile changes in real time.

2, ForwardingNode

static final class ForwardingNode<K.V> extends Node<K.V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null.null);
        this.nextTable = tab; }... Node(int hash, K key, V val) {
            this.hash = hash;
            this.key = key;
            this.val = val;
        }
Copy the code

I’m going to expand that array and build it as a ForwardingNode, and you can see its key is set MOVED.

Five, the main methods

1, tabAt(Node<K,V>[] TAB, int I)

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE);
}
Copy the code

Gets the value of the I position of TAB.

2, casTabAt (…).

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
Copy the code

Use CAS to set the value of TAB for position I, or v for expected value C.

3, setTabAt(Node<K,V>[] TAB, int I, Node<K,V> V)

static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectRelease(tab, ((long)i << ASHIFT) + ABASE, v);
}
Copy the code

This method also sets the value of v at the specified location I, but it is not set again (see logic, synchronized calls to it).

4, spread h (int)

static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}
Copy the code

And then compute the hash of h.

5, putVal(K key, V value, Boolean onlyIfAbsent)

public V put(K key, V value) {
    return putVal(key, value, false);
}
Copy the code
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh; K fk; V fv;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null.new Node<K,V>(hash, key, value)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else if (onlyIfAbsent && fh == hash &&  // check first node((fk = f.key) == key || fk ! =null&& key.equals(fk)) && (fv = f.val) ! =null)
            return fv;
        else {
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if(e.hash == hash && ((ek = e.key) == key || (ek ! =null && key.equals(ek)))) {
                                oldVal = e.val;
                                if(! onlyIfAbsent) e.val = value;break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value);
                                break; }}}else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) ! =null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update"); }}if(binCount ! =0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if(oldVal ! =null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}
Copy the code

ConcurrentHashMap does not add elements whose key&value is null. Now let’s look at the logic of ConcurrentHashMap.

1) First compute the Hash value of the key through spread(key.hashcode ()).

static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}
Copy the code

2), binCount = 0; This variable is used to record the length of the linked list for the current slot.


[] TAB = table; As you can see, the add and increment parts of the for loop have no value. The operation is equivalent to the spin operation, depending on the internal return, break and other escape logic.
,v>

for (Node<K,V>[] tab = table;;) {
    Node<K,V> f; int n, i, fh; K fk; V fv;
Copy the code

Fh, fk, and fv here refer to the hash, key, and value values of the corresponding f local variables.

4) If the table has not been initialized, initialize it with initTable.

if (tab == null || (n = tab.length) == 0)
    tab = initTable();
Copy the code

CasTabAt = (hash = (n-1) hash); casTabAt = (hash = (n-1) hash); casTabAt = (hash = (n-1) hash);

This area involves modifying the table member variable, so we need to consider concurrency. Volatile Node

[] table has the valitile keyword, but valitile is visible and reordered, and does not operate atomically. Here defined in setting the value of the position, I was to determine whether you value first (concurrent might have added other threads), then the corresponding assignment, this is two operations, valitile keyword is not atomic, so you need to use casTabAt to atomic operation assignment, (this time if another thread has the position assignment, This thread is slower than other threads, so this node of this thread needs to be added to the next node of that node. If the assignment fails, the next loop is performed.
,v>

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null.new Node<K,V>(hash, key, value)))
        break;                   // no lock when adding to empty bin
}
Copy the code

6) If the node’s hash value was MOVED(via 5), as mentioned earlier, indicating that it is expanding, add helpTransfer to help other threads expand.

else if ((fh = f.hash) == MOVED)
    tab = helpTransfer(tab, f);
Copy the code

7) If the node has a value and has not been expanded, determine whether the node is equal to the node to be added and use onlyIfAbsent to determine whether the node is added only when it does not exist. If so, return the original value of the node that already exists.

else if (onlyIfAbsent && fh == hash &&  // check first node((fk = f.key) == key || fk ! =null&& key.equals(fk)) && (fv = f.val) ! =null)
    return fv;
Copy the code

8) When none of the above is satisfied, add formal add logic. Synchronized is used here to lock the table node **(the first node added to the list, and then each for loop is the index position)**. Use the previous series of judgments to minimize lock particles.

V oldVal = null;
synchronized (f) {
Copy the code

9) CAS again to check whether the value of the TAB array I is the first element we acquired (this thread is already locked), because other threads may have changed it before we locked, such as deleting the node or changing the position. Check whether the hash value of node F is >=0 or not, because the hash value will be changed to Moven-1 in case of expansion.

if (tabAt(tab, i) == f) {
    if (fh >= 0) {
Copy the code

10) Increment binCount to record the number of nodes in the current position of the list. Next = new Node

(hash, key, value); Logic.
,v>

if (fh >= 0) {
    binCount = 1;
    for (Node<K,V> e = f;; ++binCount) {
        K ek;
        if(e.hash == hash && ((ek = e.key) == key || (ek ! =null && key.equals(ek)))) {
            oldVal = e.val;
            if(! onlyIfAbsent) e.val = value;break;
        }
        Node<K,V> pred = e;
        if ((e = e.next) == null) {
            pred.next = new Node<K,V>(hash, key, value);
            break; }}}Copy the code

11), the following logic about TreeBin will be skipped for now.

Add the element of put to the corresponding position of the array + list. But we also have to deal with the expansion logic & make sure that the added element is recorded in the member variable that counts the number of added elements.

addCount(1L, binCount);
return null;
Copy the code

Let’s break down the other calls to the append element method above.

6, initTable ()

/** * Initializes table, using the size recorded in sizeCtl. */
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0)? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n]; table = tab = nt; sc = n - (n >>>2); }}finally {
                sizeCtl = sc;
            }
            break; }}return tab;
}
Copy the code

This is how it expands. You can see:

1), the first is to determine whether the current table initialization create assignment, if not, then enter the initialization logic. The initialization of ConcurrentHashMap is somewhat different from that of HashMap. The initialization and expansion of ConcurrentHashMap are completely separate.

(sc = sizeCtl) < 0 is less than 0, because other threads may also be creating initialization. If the Thread fails, use thread.yield () to yield the CPU and wait for the next CPU scheduling. At the next time, execute the while loop (TAB = table) == null. If the other Thread succeeds, return TAB. .

3) If the current thread succeeds in the CAS race, SIZECTL is changed to -1 and goes to yield in the previous step. The following is the initialization array state of the current thread.

(TAB = table) == null; Double-check, similar to the singleton pattern of creating objects. I was thinking of a transition state here, where if another thread has finished initialization, it just completes sizeCtl = sc in finally; SizeCtl will not be -1, and it will fail if the current thread goes to if ((sc = sizeCtl) < 0), and then it will also succeed in CAS, and there will be repeated creation. Because double-checking and the table is volatile, the duplication problem is solved. We can also use this notation when we create singletons.

SizeCtl = n – (n >>> 2); sc = n – (n >>> 2); , sizeCtl = sc; . As you can see, sizeCtl has multiple meanings, not just a capacity expansion threshold like the HashMap threshold.

6, sumCount ()

final long sumCount(a) {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if(as ! =null) {
        for (int i = 0; i < as.length; ++i) {
            if((a = as[i]) ! =null) sum += a.value; }}return sum;
}
Copy the code

BaseCount = baseCount; baseCount = counterCells; baseCount = baseCount;

7, addCount(long x, int check)

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    if((as = counterCells) ! =null| |! U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null| |! (uncontended = U.compareAndSetLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended);return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) ! =null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs +1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSetInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null); s = sumCount(); }}}Copy the code

This method mainly has two logic, calculate the number of elements & expand. And the input to putVal is the number of elements in the corresponding list, so it’s >0.

1) The initial logic here is to check whether counterCells are null. If empty, no contention between threads has occurred. There is no competition again through the back! U.compareAndSetLong(this, BASECOUNT, b = BASECOUNT, s = b + x). If no contest fails, CAS sets the baseCount member variable to +1. For putVal method addCount(1L, binCount); .

CounterCell[] as; long b, s;
if((as = counterCells) ! =null| |! U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
Copy the code

2) If the competition fails (the quantity is not counted, the calculation is set to the CounterCell). Here is first judgment as did counterCells is initialized, the current thread or no corresponding CounterCell is empty, this ThreadLocalRandom. GetProbe () a value associated with the thread, It is a fixed hash value that is thread specific and defaults to 0 if it is not initialized.

Or if there is already a CounterCell corresponding to the current thread, the value of the element is +1, that is, the calculation of this element is added to the CounterCell corresponding to the current thread. So to calculate elements in ConcurrentHashMap, we need the values and CounterCell elements in baseCount+CounterCell. S = sumCount(); s = sumCount(); s = sumCount(); Evaluates to the total element value (check>1), or return.

CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
    (a = as[ThreadLocalRandom.getProbe() & m]) == null| |! (uncontended = U.compareAndSetLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended);return;
}
if (check <= 1)
    return;
s = sumCount();
Copy the code

3) This is to check whether expansion is needed. If it’s multithreaded, it can be augmented by other threads. The logic here is a little hard to understand, there is one place I still don’t understand sc == rs + 1, this equation should never wait (this didn’t happen in my tests either, please comment if you see it). Let’s analyze it in detail.

if (check >= 0) {
    Node<K,V>[] tab, nt; int n, sc;
    while (s >= (long)(sc = sizeCtl) && (tab = table) ! =null &&
           (n = tab.length) < MAXIMUM_CAPACITY) {
        intrs = resizeStamp(n); . }}Copy the code
static final int resizeStamp(int n) {
    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
Copy the code

First we need to understand the sizeCtl<-1 part which means how many threads are currently working to help with the expansion (we mentioned earlier that we need to move it 16 bits to the right).

The binary representation of resizeStamp (1 << (resize_stamp_bits-1) is 1000000000000000 with 16 bits and its int is 32768. . At the same time, the Integer numberOfLeadingZeros (n) method is to look at the 32-bit int type n the front how many 0, for example if n is 16, is the front there are 27 0. So this method returns between 32768 -32768+32. However, since the default is 16, it should be between 32768+27. For example, if n is 16, the binary returned by this method is 1000000000011011. The thread then waits for a count of 10000000000110110000000000000010, with low 16.

while (s >= (long)(sc = sizeCtl) && (tab = table) ! =null &&
       (n = tab.length) < MAXIMUM_CAPACITY) {
    int rs = resizeStamp(n);
    if (sc < 0) {... }else if (U.compareAndSetInt(this, SIZECTL, sc,
                                 (rs << RESIZE_STAMP_SHIFT) + 2))
        transfer(tab, null);
    s = sumCount();
}
Copy the code

2. Then return to the while method, s >= (long)(sc = sizeCtl), where S has already calculated the elements in the current Map. The condition in the while is to determine whether expansion is needed (either to help expansion or to be the first one for expansion).

3. Then int RS = resizeStamp(n); N indicates the length of the TAB array, so rt is used to record whether the TAB array has been expanded or not (expansion usually generates carry).

4. Now let’s look at this logic in detail.

So let’s say there’s no multithreaded contention right now, so sc = sizeCtl is >0 and we’re going to follow the else if logic. The assignment logic for sizeCtl is: Rs < < RESIZE_STAMP_SHIFT + 2, this is actually the rs values assigned to the high 16 (so sizeCtl < 0), the + 2 is equivalent to 2 at the same time, so when the expansion of the thread is | | 2-1 to 1 (because sizeCtl mentioned before, 1 table MOVEN). Transfer (TAB, NULL) is then called to expand the capacity.

If there are other threads competing, there are two situations. The else if contention failed (this thread), and the other thread is now expanding again (not finished yet). While, s >= (long)(sc = sizeCtl) is satisfied because the expansion has not been completed. If (sc < 0) {if (sc < 0) { Let’s look at what’s in the if.

while (s >= (long)(sc = sizeCtl) && (tab = table) ! =null &&
       (n = tab.length) < MAXIMUM_CAPACITY) {
    int rs = resizeStamp(n);
    if (sc < 0) {
        if((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs +1 ||
            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
            transferIndex <= 0)
            break;
        if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
            transfer(tab, nt);
    }
    e......
}
Copy the code

The sc >>> RESIZE_STAMP_SHIFT here is actually to obtain the original high 16 bit value, that is, the threads with the same value that have not completed the expansion successfully. The judgment is whether rs is equal to sc >>> STAMp_shift (to understand the description of RS written above, I do not know that I did not describe it). If not, It means that the expansion has been carried and can be broken.

There are four or conditions left: sc == RS + 1 which I haven’t figured out yet when can these two be equal? Sc == RS + MAX_RESIZERS (nt = nextTable) == null&transferIndex <= 0 indicates whether other threads are no longer needed. SizeCtl is modified via CAS if needed, indicating that another thread has been added to help with capacity expansion.

8, fullAddCount(long x, Boolean wasUncontended)

// See LongAdder version for explanation
private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();      // force initialization
        h = ThreadLocalRandom.getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        CounterCell[] as; CounterCell a; int n; long v;
        if((as = counterCells) ! =null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {            // Try to attach new Cell
                    CounterCell r = new CounterCell(x); // Optimistic create
                    if (cellsBusy == 0 &&
                        U.compareAndSetInt(this, CELLSBUSY, 0.1)) {
                        boolean created = false;
                        try {               // Recheck under lock
                            CounterCell[] rs; int m, j;
                            if((rs = counterCells) ! =null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true; }}finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if(! wasUncontended)// CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (U.compareAndSetLong(a, CELLVALUE, v = a.value, v + x))
                break;
            else if(counterCells ! = as || n >= NCPU) collide =false;            // At max size or stale
            else if(! collide) collide =true;
            else if (cellsBusy == 0 &&
                     U.compareAndSetInt(this, CELLSBUSY, 0.1)) {
                try {
                    if (counterCells == as) {// Expand table unless stale
                        CounterCell[] rs = new CounterCell[n << 1];
                        for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; }}finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = ThreadLocalRandom.advanceProbe(h);
        }
        else if (cellsBusy == 0 && counterCells == as &&
                 U.compareAndSetInt(this, CELLSBUSY, 0.1)) {
            boolean init = false;
            try {                           // Initialize table
                if (counterCells == as) {
                    CounterCell[] rs = new CounterCell[2];
                    rs[h & 1] = new CounterCell(x);
                    counterCells = rs;
                    init = true; }}finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (U.compareAndSetLong(this, BASECOUNT, v = baseCount, v + x))
            break;                          // Fall back on using base}}Copy the code

This method is an initialization of counterCells, indicating that there are multiple threads competing.

1), the first of the current thread getProbe initialization ThreadLocalRandom. LocalInit (); , then wasUncontended = true; It shows that there is no competition.

// See LongAdder version for explanation
private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();      // force initialization
        h = ThreadLocalRandom.getProbe();
        wasUncontended = true;
    }
Copy the code

2), here is for (;;) That is, to rely on internal jump out. If already done (as = counterCells)! = null (a = as[(n-1) &h]) == null, cellsBusy is used for spin in for. Create new CounterCell(x) without initialization.

boolean collide = false;                // True if last slot nonempty
for (;;) {
    CounterCell[] as; CounterCell a; int n; long v;
    if((as = counterCells) ! =null && (n = as.length) > 0) {
        if ((a = as[(n - 1) & h]) == null) {
            if (cellsBusy == 0) {            // Try to attach new Cell
                CounterCell r = new CounterCell(x); // Optimistic create. } collide =false;
Copy the code

3) CAS sets cellsBusy to 1, indicating that something else is being processed. Rs [j] = (m-1) &h] == null, then set rs[j] = r, if created = true, then set cellsBusy = 0, then restore, finally break; If the judgment condition is not met or the assignment fails, continue the next contest. If the competition fails, it doesn’t go on break or continue, it collide = false; .

if (cellsBusy == 0) {            // Try to attach new Cell
    CounterCell r = new CounterCell(x); // Optimistic create
    if (cellsBusy == 0 &&
        U.compareAndSetInt(this, CELLSBUSY, 0.1)) {
        boolean created = false;
        try {               // Recheck under lock
            CounterCell[] rs; int m, j;
            if((rs = counterCells) ! =null &&
                (m = rs.length) > 0 &&
                rs[j = (m - 1) & h] == null) {
                rs[j] = r;
                created = true; }}finally {
            cellsBusy = 0;
        }
        if (created)
            break;
        continue;           // Slot is now non-empty
    }
}
collide = false;
Copy the code

4) If (a = as[(n-1) &h]) == null, the thread has completed the creation of CounterCell. If wasUncontended is false, wasUncontended = true; Continue to the next loop. If the contending succeeds, the contending element count for this thread is complete, and then break; .

if((as = counterCells) ! =null && (n = as.length) > 0) {
    if ((a = as[(n - 1) & h]) == null) {... }else if(! wasUncontended)// CAS already known to fail
        wasUncontended = true;      // Continue after rehash
    else if (U.compareAndSetLong(a, CELLVALUE, v = a.value, v + x))
        break;
    else if(counterCells ! = as || n >= NCPU) collide =false;            // At max size or stale
    else if(! collide) collide =true;
    else if (cellsBusy == 0 &&
             U.compareAndSetInt(this, CELLSBUSY, 0.1)) {
        try {
            if (counterCells == as) {// Expand table unless stale
                CounterCell[] rs = new CounterCell[n << 1];
                for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; }}finally {
            cellsBusy = 0;
        }
        collide = false;
        continue;                   // Retry with expanded table
    }
    h = ThreadLocalRandom.advanceProbe(h);
}
Copy the code

If it fails, counterCells! = as to see if it is still equal. Else if (counterCells! = as | | n > = NCPU) fields behind my understanding is that if competition is too fierce, it counterCells expansion relieving the pressure of competition. Do you have a specific understanding of the meaning here, resulting in cognitive errors?

If the initialization of a CounterCell is not completed, the initialization of a CounterCell is new [2].

for (;;) {
    CounterCell[] as; CounterCell a; int n; long v;
    if((as = counterCells) ! =null && (n = as.length) > 0) {... }else if (cellsBusy == 0 && counterCells == as &&
    U.compareAndSetInt(this, CELLSBUSY, 0.1)) {
       boolean init = false;
       try {                           // Initialize table
           if (counterCells == as) {
               CounterCell[] rs = new CounterCell[2];
               rs[h & 1] = new CounterCell(x);
               counterCells = rs;
               init = true; }}finally {
           cellsBusy = 0;
       }
       if (init)
           break;
    }
    else if (U.compareAndSetLong(this, BASECOUNT, v = baseCount, v + x))
       break;      
Copy the code

Transfer (Node<K,V>[] TAB, Node<K,V>[] nextTab)

/** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. */
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1)? (n >>>3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n <<1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSetInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false; }}if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit}}else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for(Node<K,V> p = f.next; p ! =null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for(Node<K,V> p = f; p ! = lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        ......
                    }
                }
            }
        }
    }
}
Copy the code

This method, like the previous addCount method, is complicated. We can see that the logic of inserting elements is relatively simple.

1) First, set the default value of segments during data expansion

int n = tab.length, stride;
if ((stride = (NCPU > 1)? (n >>>3) / NCPU : n) < MIN_TRANSFER_STRIDE)
    stride = MIN_TRANSFER_STRIDE; // subdivide range
Copy the code
private static final int MIN_TRANSFER_STRIDE = 16;
Copy the code

NextTab indicates the target array for data expansion, which is also initialized by default. Node
[n << 1]; Move one to the left. In addition, transferIndex indicates that the next time a thread (such as another thread) helps expand the capacity, the nextTab will be null only at the beginning of this method because of the logic of the previous addCount method. And then some other thread will come in and have a value.

if (nextTab == null) {            // initiating
    try {
        @SuppressWarnings("unchecked")
        Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n <<1];
        nextTab = nt;
    } catch (Throwable ex) {      // try to cope with OOME
        sizeCtl = Integer.MAX_VALUE;
        return;
    }
    nextTable = nextTab;
    transferIndex = n;
}
Copy the code

3) Wrapping nextTab as ForwardingNode indicates that expansion is in progress, advance indicates whether expansion needs to be continued, and finishing indicates whether expansion has been completed (otherwise the thread entering the for loop will continue to help expansion).

int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
Copy the code

4) First look at the for structure, there is no judgment condition in it and it can only rely on the return and break inside, but return is used here. Meanwhile, we can see that one of this return is controlled by Finishing, and finishing is controlled by the CAS setting below.

Into the condition of the first is the I < 0 | | I > = n | | I + n > = nextn, this show is the index of an array, I if I less than zero have done traversal (show), or I > = n, n is the original TAB the length of the array, or the I + n > = nextn, Nextn is the length of the newly expanded array. If one of these conditions is met, the following possible return logic is entered.

CAS assigns -1 to sizeCtl (indicating that the piecewise capacity expansion task has been completed and the thread is no longer able to help capacity expansion, the number of threads that will help capacity expansion is -1). If it succeeds, judge (SC-2)! = resizeStamp(n) << RESIZE_STAMP_SHIFT, note the order: << priority ratio! = high, where another thread is still transferring data to the new array, but the current thread is running out of new threads to process, so it will reach this point! =, so the current thread can return.

When the last thread is finished, sc-2 will indicate that there are no threads to help expansion, and it will be equal to resizeStamp(n) << RESIZE_STAMP_SHIFT, and finishing = advance = true. &i = n; SizeCtl = (n << 1) – (n >>> 1); And complete nextTable = NULL; . This completes the data transfer to the new array with all threads.

for (int i = 0, bound = 0;;) {...if (i < 0 || i >= n || i + n >= nextn) {
        int sc;
        if (finishing) {
            nextTable = null;
            table = nextTab;
            sizeCtl = (n << 1) - (n >>> 1);
            return;
        }
        if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
            if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                return;
            finishing = advance = true;
            i = n; // recheck before commit}}...else{... }}}Copy the code

Let’s take a look at how segmented multithreaded data transfer works.

5) Advance is set to true by default. I, bound by default to 0, so the beginning if (– > = I bound | | finishing) this is not satisfied. Bound refers to the end of the current thread’s processing (near 0), and I refers to the beginning of this thread’s processing (the transferIndex side). Previously, we analyzed the possible assignments of I and finishing in section 4). For finishing, it would enter here to enter the for logical exit next time. And this — I is actually subtracting, so — I >= bound means I is still in the current thread transfer.

** Also note that for this code, there are multiple threads reprocessing. The I and bound values for these threads are different, but they are divided between the original TAB length. ** The main purpose is to connect different threads to handle different segments of data transfer.

NextIndex means that the starting position of the current thread processing end nextIndex = transferIndex. If it can still be allocated, this will be >0 and will not jump out. It will enter the modification of transferIndex and move it further, indicating that the current segment is contracted by the thread. If there are no more, I = -1; , advance = false, satisfies the exit while and the if (I < 0 | | I > = n | | I + n > = nextn) conditions to exit.

Now let’s look at the CAS for the TRANSFERINDEX. The stride is assigned first to indicate how long each segment is. Assign the next starting point to the TRANSFERINDEX, and the end of the segment that this thread is responsible for to bound, i.e., the next starting point. I assign the starting point of this time to I = nextIndex-1 I to loop through.

for (int i = 0, bound = 0;;) {
    Node<K,V> f; int fh;
    while (advance) {
        int nextIndex, nextBound;
        if (--i >= bound || finishing)
            advance = false;
        else if ((nextIndex = transferIndex) <= 0) {
            i = -1;
            advance = false;
        }
        else if (U.compareAndSetInt
                 (this, TRANSFERINDEX, nextIndex,
                  nextBound = (nextIndex > stride ?
                               nextIndex - stride : 0))) {
            bound = nextBound;
            i = nextIndex - 1;
            advance = false; }}Copy the code

6) Here is the first node where CAS obtains the current I. If it is null, use FWD to represent this position, indicating that the index of TAB is expanding, but there is no value, so there is no data transfer process.

> > Move (fh = f. Hash) == MOVED, advance = true; , and let the current thread find the next segment to process.

Synchronized (f); synchronized (f); synchronized (f); synchronized (f) In this way, data cannot be inserted into the linked list during data transfer.

else if ((f = tabAt(tab, i)) == null)
    advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
    advance = true; // already processed
else {
    synchronized (f) {
Copy the code

7) You can see that there is also a double check here. At the same time, data transfer here also involves the concept of low and high, this part is again redundant, you can read the previous analysis of HashMap, there is a specific analysis. But what’s different here is that instead of having four member variables that correspond to each other in order to build the high order. This is int b = p. Hash & n; , runBit = b; The value of runBit can be 0(low value, no need to + expansion length) or N (high value, need to +). This indicates that the runBit will be changed if the last and the next high and low places are different. In other words, if it’s 101011111 in this order, it’s going to stop at 1 in position 5.

For (Node

p = f; p ! = lastRun; P = p ext) logic will stop, there is no need to create new nodes to transfer the following nodes, which is a kind of optimization (need to meet the same high or low order in the following part), but if there are more same high or low order in the following part, is not repeated again? (Or is my understanding based on the question?)
,v>

synchronized (f) {
    if (tabAt(tab, i) == f) {
        Node<K,V> ln, hn;
        if (fh >= 0) {
            int runBit = fh & n;
            Node<K,V> lastRun = f;
            for(Node<K,V> p = f.next; p ! =null; p = p.next) {
                int b = p.hash & n;
                if (b != runBit) {
                    runBit = b;
                    lastRun = p;
                }
            }
            if (runBit == 0) {
                ln = lastRun;
                hn = null;
            }
            else {
                hn = lastRun;
                ln = null;
            }
            for(Node<K,V> p = f; p ! = lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;
                if ((ph & n) == 0)
                    ln = new Node<K,V>(ph, pk, pv, ln);
                else
                    hn = new Node<K,V>(ph, pk, pv, hn);
            }
            setTabAt(nextTab, i, ln);
            setTabAt(nextTab, i + n, hn);
            setTabAt(tab, i, fwd);
            advance = true;
        }
Copy the code

It is then assigned to the expanded array via setTabAt. Use a setTabAt(TAB, I, FWD) to indicate that the current index position is being expanded.

The above is the specific logic of expansion.

10, remove(Object key, Object value)

public boolean remove(Object key, Object value) {
    if (key == null)
        throw new NullPointerException();
    returnvalue ! =null && replaceNode(key, null, value) ! =null;
}
Copy the code
final V replaceNode(Object key, V value, Object cv) {
    int hash = spread(key.hashCode());
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            boolean validated = false;
            synchronized (f) {
                if(tabAt(tab, i) == f) { ...... Traversal search for f node linked list}else if (f instanceof TreeBin) {
                        ......
                    }
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update"); }}if (validated) {
                if(oldVal ! =null) {
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break; }}}return null;
}
Copy the code

This is the fetch logic. Synchronized (f) else if ((fh = f.hash) == MOVED), helpTransfer(TAB, f) will be called; , obtain help for capacity expansion.

At this point, the main logic of ConcurrentHashMap is summarized. The general idea is clear, but there are a few small points that are not quite sure what they mean.