Historical articles recommended:

  1. ConcurrentHashMap has 10 details to improve performance. Do you know them all?
  2. HashMap interviews, read this article
  3. There are seven ways to do something about SpringBoot initialization
  4. Be careful with these three holes in Java serialization
  5. Seven potential memory leak risks in Java. How many do you know?
  6. A look at the new features in JDK 16
  7. What? It’s slower with parallel streams

The ConcurrenHashMap source code for Java 7 is a textbook for multithreaded programming. In the Java 7 source code, the authors are very cautious about the use of pessimistic locks, most of which are switched to spunlocks and volatile to obtain the same semantics. Even if they have to be used in the end, the authors use various techniques to reduce the critical area of locks. We have talked about in the last article, spin locks in the critical region is smaller when is a better choice because it avoids the thread context switching due to obstruction, but essentially it is a lock, and during the spin wait only one thread can enter the critical section, other threads will only use the spin CPU time slice. The implementation of ConcurrentHashMap in Java 8 uses some clever design and tricks to circumvent the limitations of spin locking and provide higher concurrency performance. If the Java 7 version of the source code teaches us how to turn a pessimistic lock into a spinlock, in Java 8 we can even see how to turn a spinlock into a lock-free one.

Put the book read thin

Photo: www.zhenchao.org/2019/01/31/…

Before starting this article, you should first have such a map in mind. If you are familiar with HashMap, you should also have this map in mind. In fact, Java 8 ConcurrentHashMap and HashMap are basically the same in terms of overall data structure design.

Java 7 ConcurrentHashMap uses a lot of programming tricks to improve performance, but there is still a lot of room for improvement in the design of the Segment.

  1. SegmentDuring capacity expansion, the non-capacity expansion thread is to this nodeSegmentAll write operations are pending
  2. rightConcurrentHashMapThe read operation requires two hashes, and there is an additional performance penalty for reading too much and writing too little
  3. althoughsize()The lock – free read () method is attempted first, but is called if another thread is writing during the processsize()This thread will give the wholeConcurrentHashMapLock it. This is the wholeConcurrrentHashMapA unique global lock, which has performance implications for the underlying components
  4. Extreme cases (such as a client implementing a poorly performing hash function)get()The method’s complexity will degrade toO(n).

For 1 and 2, the design in Java 8 deprecates the use of Segment, reducing the granularity of pessimistic locks to the bucket dimension so that get calls don’t have to be hashed twice. The design of size() is one of the biggest highlights of the Java 8 release, which we’ll explain in more detail in a later article. As for red-black trees, this article still doesn’t say much about them. In the following sections, I will dig into details to make the book more extensive. Modules involved include initialization, PUT method, expansion method transfer and size() method, while other modules, such as hash function, have little change, so I will not go into details.

Prepare knowledge

ForwardingNode

static final class ForwardingNode<K.V> extends Node<K.V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        // MOVED = -1, the ForwardingNode hash is -1
        super(MOVED, null.null.null);
        this.nextTable = tab; }}Copy the code

In addition to the normal Node and TreeNode, ConcurrentHashMap also introduces a new data type, ForwardingNode. We will only show its constructor here. ForwardingNode has two functions:

  • Indicates that a bucket has been copied to a new bucket array during dynamic capacity expansion
  • If there is one during dynamic expansiongetMethod, thenForwardingNodeThe request will be forwarded to a new bucket array to avoid blockinggetMethod call,ForwardingNodeThe expanded bucket array is added during constructionnextTableSave it.

UNSAFE.compareAndSwap***

ConcurrentHashMap implements CAS in Java version 8. Using int as an example, its methods are defined as follows:

/**
* Atomically update Java variable to <tt>x</tt> if it is currently
* holding <tt>expected</tt>.
* @return <tt>true</tt> if successful
*/
public final native boolean compareAndSwapInt(Object o, long offset,
                                              int expected,
                                              int x);
Copy the code

The corresponding semantics are:

If object O’s starting address offset is equal to expected, set the value to x and return true to indicate that the update succeeded; otherwise, return false to indicate that the CAS failed

Initialize the

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    if(! (loadFactor >0.0 f) || initialCapacity < 0 || concurrencyLevel <= 0) // Check the parameters
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)
        initialCapacity = concurrencyLevel;
    long size = (long) (1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size); // tableSizeFor; tableSizeFor; // tableSizeFor
    this.sizeCtl = cap; 
}
Copy the code

Even the most complex initialization method code is relatively simple, and there are only two points to note here:

  • concurrencyLevelinJava 7Is in theSegmentThe length of the array due to theJava 8Chinese has been abandonedSegment, soconcurrencyLevelIt is a reserved field and has no practical meaning
  • sizeCtlIf this value is -1, it indicates that the system is being initialized. If it is negative, it indicates that the system is being expandedsizeCtlThe lower 16 bits of binary are equal to the number of expanded threads plus one, and the higher 16 bits (excluding the sign bits) contain the size of the bucket array

putmethods

public V put(K key, V value) {
    return putVal(key, value, false);
}
Copy the code

The put method forwards the call to the putVal method:

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // [A] delay initialization
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // [B] the current bucket is empty
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null.new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // [C] if the first element of the current bucket is a ForwardingNode, the thread will try to join the expansion
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        // 【D】 otherwise, go through the list or tree in the bucket and insert
        else {
            // Fold it up for now}}// [F] If the process reaches this point, it indicates that the process has been put successfully, and the total number of map records is increased by one
    addCount(1L, binCount);
    return null;
}
Copy the code

From the whole code structure, the process is relatively clear. I marked several very important steps with parentheses and letters. The PUT method still involves a lot of knowledge points

Initialization of the bucket array

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            // Indicates that a thread is initializing, and the thread is starting to spin
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            // CAS guarantees that only one thread can reach the branch
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0)? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n]; table = tab = nt;// sc = n-n /4 = 0.75n
                    sc = n - (n >>> 2); }}finally {
                // restore sizeCtl > 0
                sizeCtl = sc;
            }
            break; }}return tab;
}
Copy the code

If multiple threads execute initTable, CAS ensures that only one thread can enter the actual initialization branch, while the rest of the threads spin wait. Let’s focus on three things in this code:

  • As mentioned earlier, when a thread starts to initialize the bucket array, theCASwillsizeCtlWhen set to -1, other threads use this as a signal to start spin waiting
  • When the bucket array is initializedsizeCtlReturns to a positive value equal to 0.75 times the length of the bucket array, meaning the same as beforeHashMapIn theTHRESHOLDConsistency is the critical point for the system to trigger expansion
  • infinallyStatement ofsizeCtlThe operation is not usedCASbecauseCASEnsure that only one thread can execute to this point

Add the first element of the bucket array

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
Copy the code

The second branch of the PUT method uses tabAt to determine whether the bucket is empty and, if so, writes to it via CAS. TabAt uses the UNSAFE interface to retrieve the latest element in the bucket. CasTabAt uses CAS to ensure that there are no concurrency issues

Determine whether to add threads for capacity expansion

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if(tab ! =null && (f instanceofForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) ! =null) {
        int rs = resizeStamp(tab.length);
        while (nextTab == nextTable && table == tab &&
                (sc = sizeCtl) < 0) {
            // RESIZE_STAMP_SHIFT = 16
            if((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs +1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            // The sizeCtl value is increased by 1, indicating that the number of threads participating in the expansion is +1
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);
                break; }}return nextTab;
    }
    return table;
}
Copy the code

The sizeCtl flag bit is discussed in detail here. The temporary variable RS is returned by the resizeStamp method

static final int resizeStamp(int n) {
    // RESIZE_STAMP_BITS = 16
    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
Copy the code

Because the parameter n is a value of type int, all Integer numberOfLeadingZeros (n) returns a value between 0 to 32, if converted to binary

  • Integer.numberOfLeadingZeros(n)The maximum value of is: 00000000 00000000 00000000 00100000
  • Integer.numberOfLeadingZeros(n)The minimum value of is: 00000000 00000000 00000000 00000000

Therefore, the return value of resizeStampd is between 00000000 00000000 10000000 00000000 and 00000000 00000000 10000000 00100000, It can be seen from the range of return value that the 16 bits of the return value of resizeStamp are all 0, which does not contain any information. Therefore, in ConcurrrentHashMap, the return value of resizeStamp is moved 16 bits left to fit into sizeCtl, which is why the top 16 bits of sizeCtl contain the entire Map size. With this analysis, the longer if judgments in this code can be made sense

if((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs +1 ||
    sc == rs + MAX_RESIZERS || transferIndex <= 0)
    break;
Copy the code
  • (sc >>> RESIZE_STAMP_SHIFT) ! = rsEnsure that all threads are expanded based on the same old bucket array
  • transferIndex <= 0A thread has completed the capacity expansion task

As for sc = = rs + 1 | | sc = = rs + MAX_RESIZERS if the two judgment conditions are carefully classmates will feel difficult to understand, this place is, indeed, the JDK a BUG, the BUG in the JDK 12 have repair, The details you can refer to the Oracle’s website: bugs.java.com/bugdatabase… = = (rs + 1 < < RESIZE_STAMP_SHIFT) | | sc = = (rs < < RESIZE_STAMP_SHIFT) + MAX_RESIZERS, because to directly compare the rs and sc ` is meaningless, there must be shift operation. What it means is

  • sc == (rs << RESIZE_STAMP_SHIFT) + 1If the number of threads for capacity expansion is 0, capacity expansion has been completed, and there is no need to add new threads for capacity expansion
  • sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERSIf the number of threads participating in capacity expansion reaches the maximum, no more threads need to be added for capacity expansion

The actual logic for scaling is in transfer, which we will look at later, but there is a small detail to note in advance. If nextTable is already initialized, Transfer returns a reference to nextTable, and you can then directly manipulate the new bucket array.

Insert the new value

If the bucket array has been initialized, the expanded bucket has been expanded, and there are already elements in the hashed bucket, the process is just like a normal HashMap, except that the current bucket is locked.

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)/ / folding
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {/ / folding}
        else if ((fh = f.hash) == MOVED)/ / folding
        else {
            V oldVal = null;
            synchronized (f) {
                // Pay attention to this small judgment condition
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) { // fh>=0 is a linked list, otherwise it is a tree or ForwardingNode
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if(e.hash == hash && ((ek = e.key) == key || (ek ! =null && key.equals(ek)))) {
                                oldVal = e.val; // If there is a value in the list, update it directly
                                if(! onlyIfAbsent) e.val = value;break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                // If there is no value in the list, it is directly connected to the end of the list
                                pred.next = new Node<K,V>(hash, key, value, null);
                                break; }}}// The red-black tree operation will be skipped}}}}// If put succeeds, the number of map elements +1
    addCount(1L, binCount);
    return null;
}
Copy the code

Note a trivial judgment condition in this code (the context is noted in the source code) : tabAt(TAB, I) == f. The purpose of this judgment is to handle the contention between the thread calling the PUT method and the expansion thread. Synchronized is a blocking lock. If the thread that calls put happens to operate the same bucket at the same time as the expansion thread, and the thread that calls PUT fails to compete for the lock, the element in the bucket will become a ForwardingNode when the thread regains the lock. TabAt (TAB, I)! Is equal to f.

Multithreaded dynamic capacity expansion

I apologize to the readers who are reading this article on their mobile phones. Although I tried to unpack the source code, I had to show such a long section of source code for coherence.

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1)? (n >>>3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // Initializes a new bucket array
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n <<1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                        (this, TRANSFERINDEX, nextIndex,
                        nextBound = (nextIndex > stride ?
                                    nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false; }}if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // Check whether it is the last thread to expand
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit}}else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED) // Only the last expansion thread has a chance to execute the branch
            advance = true; // already processed
        else { // The replication process is similar to HashMap and will not be described here
            synchronized (f) {
               / / folding}}}}Copy the code

Before diving into the details of the source code, let’s look at some of the features of the ConcurrentHashMap expansion in Java 8:

  • The new bucket array, nextTable, is twice as long as the previous HashMap

  • The threads participating in the expansion also fragment the elements of the table into a new bucket array, nextTable

  • Buckets The elements of a bucket array are evenly distributed between two buckets in the new bucket array, and the subscripts of buckets differ by n(the length of the old bucket array), which is still consistent with the HashMap

How do threads work together

Let’s start with the key variable transferIndex, which is volatile modified to ensure that all threads read the latest value.

private transient volatile int transferIndex;
Copy the code

This value is initialized by the first thread to participate in the expansion, because only the first thread to participate in the expansion satisfies the condition nextTab == NULL

if (nextTab == null) {            // initiating
    try {
        @SuppressWarnings("unchecked")
        Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n <<1];
        nextTab = nt;
    } catch (Throwable ex) {      // try to cope with OOME
        sizeCtl = Integer.MAX_VALUE;
        return;
    }
    nextTable = nextTab;
    transferIndex = n;
}
Copy the code

This loop is easy to understand given the transferIndex property

while (advance) {
    int nextIndex, nextBound;
      // When bound <= I <= transferIndex I subtracts out of the loop and continues working
    if (--i >= bound || finishing)
        advance = false;
    // All tasks for capacity expansion have been claimed
    else if ((nextIndex = transferIndex) <= 0) {
        i = -1;
        advance = false;
    }
    // Otherwise, claim a new replication task and update the transferIndex value through 'CAS'
    else if (U.compareAndSwapInt
                (this, TRANSFERINDEX, nextIndex,
                nextBound = (nextIndex > stride ?
                            nextIndex - stride : 0))) {
        bound = nextBound;
        i = nextIndex - 1;
        advance = false; }}Copy the code

TransferIndex is like a cursor. When each thread claims a replication task, it will update it to transferIndex-Stride through CAS. CAS can ensure that the transferIndex can be reduced to 0 according to the stride.

Does the last expansion thread need a second confirmation?

For each expansion thread, variable I of the for loop represents the subscript in the bucket array of the bucket to be copied. The upper and lower limits of this value are calculated by the cursor transferIndex and the stride. When I is reduced to a negative value, it means that the current expansion thread has completed the expansion task, and the process will go to this branch:

/ / I > = n | | I + n > = nextn take less than now
if (i < 0 || i >= n || i + n >= nextn) {
    int sc;
    if (finishing) { // 【A】 Complete the expansion process
        nextTable = null;
        table = nextTab;
        sizeCtl = (n << 1) - (n >>> 1); 
        return;
    }
    // 【B】 Check whether it is the last thread of expansion. If it is, rescan the bucket array for a second check
    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
        // (SC-2) == resizeStamp(n) << RESIZE_STAMP_SHIFT indicates the last expansion thread
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
        // Re-scan the bucket array for a second check
        finishing = advance = true;
        i = n; // recheck before commit}}Copy the code

Since the variable finishing is initialized to false, the if branch with comment [B] will be executed when the thread enters the if branch for the first time, and the lower 16 bits of sizeCtl will be initialized to add one to the number of threads participating in the expansion. Therefore, if the condition (SC-2! = resizeStamp(n) << RESIZE_STAMP_SHIFT, it can be proved that the current thread is the last thread to expand, at this time, set I to N to rescan the bucket array, And finishing is set to true to ensure that after the bucket array is scanned, it can enter the branch with comment [A] to end expansion.

There is a problem here. According to our previous analysis, the expansion threads can work together to ensure that the segments of the bucket array they are responsible for are not heavy or leaky. Why do we need to do a second confirmation here? A developer in concurrency – interest this mailing list also consulted about this Doug Lea (address: cs.oswego.edu/pipermail/c…

Yes, this is a valid point; thanks. The post-scan was needed in a previous version, and could be removed. It does not trigger often enough to matter though, so is for now another minor tweak that might be included next time CHM is updated.

Although Doug used phrases such as could be, not often enough in his email, he also confirmed that the second check of the last expansion thread was not necessary. The copying process is similar to a HashMap, and interested readers can scroll through previous articles.

size()methods

AddCount () method

// Record a member variable for the total number of map elements
private transient volatile long baseCount;
Copy the code

At the end of the put method, there is an addCount method, which maintains the total number of elements in the current ConcurrentHashMap because putVal has successfully added an element. In ConcurrentHashMap, there is a variable baseCount used to record the number of elements in the map. As shown in the figure below, if n threads operate the baseCount variable through CAS at the same time, only one of them will succeed, and all the other threads will be in endless spin. That’s bound to create performance bottlenecks.

To avoid a large number of threads spinning around waiting to write to baseCount, ConcurrentHashMap introduces a secondary queue, as shown below. Now the threads operating on baseCount can be split into this secondary queue. To call size(), you simply add the values in the baseCount and the secondary queue, thus making the size() call unlocked.

The secondary queue is an array of type CounterCell:

@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(longx) { value = x; }}Copy the code

It simply wraps a value of type long, and the question is, how does it know which value is in the secondary queue for a particular thread? The answer is the following method:

static final int getProbe(a) {
    return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
Copy the code

The getProbe method returns a unique id for the current thread. This value does not change. Therefore, the return value of getProbe can be subscript by complicating the length of the secondary queue. If it returns 0. You need to call ThreadLocalRandom localInit initialization (). There are two details to note in the addCount method

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    // Note that the judgment condition here is tricky
    if((as = counterCells) ! =null| |! U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            // The variable uncontended records whether the CAS operation succeeded! (uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended);return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    if (check >= 0) {
        // Check whether the capacity needs to be expanded}}Copy the code

Details:

First, notice the new if judgment condition in the method:

if((as = counterCells) ! =null| |! U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
}
Copy the code

If (as = counterCells)! If = null, CAS will not be executed. The author has two considerations:

  1. The reason is that if(as = counterCells) ! = null, indicating that the secondary queue has been initialized, as opposed to all threads spinning waitbaseCountThis one variable, let the thread passCASDemanipulating the value in the queue has a greater likelihood of success because the maximum length of the secondary queue is a positive integer power of 2 greater than the current number of processors, allowing for greater concurrency
  2. If the secondary queue has not been initialized until it is necessary to create a queue, how can we determine “necessity”? Depending on thebaseCounttheCASIf the operation fails, it indicates that the concurrency in the current system is high and the queue is required. Otherwise, the operation is performed directlybaseCount

Details of the two:

Only when the auxiliary queue has been in existence, and by ThreadLocalRandom. GetProbe () in the auxiliary queue to determine the location of the not null, only the CAS is done in the operation, it is a normal defensive judgment, but uncontended recorded the CAS success, failure, Will call in fullAddCount ThreadLocalRandom. AdvanceProbe in a status code adjustment under the current thread in the position of the auxiliary queue, avoid all threads in the same pit of auxiliary queue spin wait.

fullAddCount()methods

// See LongAdder version for explanation
// wasUncontended records whether the CAS is successful for the caller. If the CAS fails, change the element of the secondary queue to continue the CAS
private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();      // force initialization
        h = ThreadLocalRandom.getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        CounterCell[] as; CounterCell a; int n; long v;
        // 【A】 If the secondary queue has been created, the secondary queue can be operated directly
        if((as = counterCells) ! =null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {            // Try to attach new Cell
                    CounterCell r = new CounterCell(x); // Optimistic create
                    if (cellsBusy == 0 &&
                        U.compareAndSwapInt(this, CELLSBUSY, 0.1)) {
                        boolean created = false;
                        try {               // Recheck under lock
                            CounterCell[] rs; int m, j;
                            if((rs = counterCells) ! =null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true; }}finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if(! wasUncontended)// If the caller CAS fails, this cycle runs empty, and the next cycle continues by changing the subscript
                wasUncontended = true;      // Continue after rehash
            else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                break;
            else if(counterCells ! = as || n >= NCPU)// If the length of the secondary queue exceeds the number of cpus, this cycle runs empty, and the next cycle continues with subscripts
                collide = false;            // At max size or stale
            else if(! collide)// If the last operation failed (CAS failed or creating a CounterCell failed), this cycle runs empty and the next cycle changes the subscript to continue
                collide = true;
            else if (cellsBusy == 0 && // If two consecutive operations on the secondary queue fail, expand the secondary queue
                        U.compareAndSwapInt(this, CELLSBUSY, 0.1)) {
                try {
                    if (counterCells == as) {// Expand table unless stale
                        CounterCell[] rs = new CounterCell[n << 1];
                        for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; }}finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            // If the last operation failed or the caller CAS failed, it will go here and transform the secondary queue subscript to operate on
            h = ThreadLocalRandom.advanceProbe(h);
        }
        // [B] if the secondary queue is not already created, create it with a lock
        else if (cellsBusy == 0 && counterCells == as &&
                    U.compareAndSwapInt(this, CELLSBUSY, 0.1)) {
            boolean init = false;
            try {                           // Initialize table
                if (counterCells == as) {
                    CounterCell[] rs = new CounterCell[2];
                    rs[h & 1] = new CounterCell(x);
                    counterCells = rs;
                    init = true; }}finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        // 【C】 If the secondary queue fails to be created, try 'baseCount'
        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
            break;                          // Fall back on using base}}Copy the code

Since counterCells is a normal array, writes to it, including initialization, expansion, and writes to elements, need to be locked by spin-locking the global variable cellsBusy. Let’s start with the three outermost branches:

  • [B] If the secondary queue is not already created, the secondary queue is created with a lock
  • [C] If secondary queue creation fails due to lock failure, spin write variable is attemptedbaseCountIn case it works
  • [A] If A secondary queue has been created, the corresponding element of the secondary queue will be operated directly

Comments marked [A] the branch code is more, its main idea is that if by CAS or auxiliary lock operation in the queue an element fail, the first by calling ThreadLocalRandom. AdvanceProbe (h) in an element in A queue to continue operating, The success of the action is recorded in the temporary variable Collide. If the next operation fails again, the concurrency is large and needs to be expanded. If the length of the secondary queue exceeds the number of cpus, the number of threads that can run at the same time cannot exceed the number of cpus on the computer.

There are four details that still need to be noted in this process:

Details:

CounterCells are simply an array and therefore not thread-safe, so write operations on them need to be locked to ensure concurrency safety

Details of the two:

When locking, the author made a double-check. I read that some articles interpreted it as “double-check similar to singleton mode”, which is not correct. The reason why the author did this was discussed in the last article. First first check cellsBusy = = 0 is the basis of process go down, if cellsBusy = = 1, exit, directly take lock failure call h = ThreadLocalRandom. AdvanceProbe (h); Update h and try again. If cellsBusy == 0 succeeds, call CounterCell r = new CounterCell(x); Initialize a CounterCell to reduce the size of the critical region for spinlocks to improve concurrency performance

Details of the three:

Check if cellsBusy is 0 before locking. If cellsBusy is 1, the lock failed. Since reading volatile is less expensive than calling the UNSAFE CAS operation, there is no need to invoke the time-consuming CAS if reading cellsBusy already indicates a lock failure

Details of the four:

The change to cellsBusy from 0 to 1 calls CAS, but setting from 1 to 0 only uses assignment. This is because CAS guarantees that only one thread can reach this statement, so assignment can be used to change the value of cellsBusy.

sumCount

The first two methods divide the number of elements in ConcurrentHashMap into baseCount and secondary queues. The size() method is called by simply adding the values.

public int size(a) {
    long n = sumCount();
    return ((n < 0L)?0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

final long sumCount(a) {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if(as ! =null) {
        for (int i = 0; i < as.length; ++i) {
            if((a = as[i]) ! =null) sum += a.value; }}return sum;
}
Copy the code

Afterword.

Java 8’s improvements to ConcurrentHashMap are all in the right place in terms of code. In theory, the performance of ConcurrentHashMap should be much better than that of Java 7, but I haven’t found a benchmark that compares the two versions. If any readers have done the relevant verification or seen the relevant benchmark, welcome to communicate with me.