preface

JDK8 provides a better performance of long integer atomic classes: Java. Util. Concurrent. Atomic. LongAdder. At high concurrency, it has better performance than AtomicLong at the cost of consuming more memory.

With AtomicLong, why introduce LongAdder? AtomicLong ensures concurrency safety by volatile+CAS. It has only one value, and all threads operate on that value, which means that the value is a hot data. As the number of concurrent operations increases, the probability of CAS operation failure increases, and more and more threads spin retry, which consumes CPU resources and reduces performance. For a detailed explanation of AtomicLong, you can see another article by me: “AtomicLong source guide”.

In order to solve the performance problem of AtomicLong with high concurrency, Doug Lea wrote a LongAdder. It should be noted that LongAdder cannot completely replace AtomicLong. LongAdder’s sum() is an estimate. To obtain an exact value at a given time, a global lock is required.

How is LongAdder optimized? LongAdder’s code is much more complex than AtomicLong’s, and its main idea is to divide and conquer. LongAdder scatters the unique hot data in AtomicLong by using the array of cells []. Each Cell is a resource and the thread only operates on the value of its own Cell, which greatly reduces the contention for hot resources. This is the same idea as the segmental lock of ConcurrentHashMap.

Through the idea of divide and conquer, the single hotspot resources are scattered into the array of cells [], and the thread only operates the value of its own Cell. When the final result is summarized, the value of each Cell only needs to be added up to be the final result of LongAdder.

It is important to note that sum() returns only an estimate and is not accurate because there are threads modifying the value of the Cell while summarizing the results. If you want to calculate the exact value, you need to add a global lock, so LongAdder is better suited for scenarios where you write more than you read.


Introduction to the source code

UML

The structure of LongAdder is not complicated, and the difference with AtomicLong is that it inheritsjava.util.concurrent.atomic.Striped64Class. Striped64 uses the idea of segmented locking, which is used to efficiently process 64-bit data with high concurrency. Striped64 is designed to decentralize hot resource contention under high concurrency. Operate directly without contentionbaseVariables, same as AtomicLong. In the case of multi-thread contention, the hotspot resources are scattered toCell[]In the array. eachCellInstances are all one resource, and one is maintained internallyvolatile long valueAt the same timecas()Method to allow threads to make CAS changes to values.

attribute

LongAdder has no attributes of its own, and all inherit from its parent, Striped64. Base is a long integer that is volatile and serves two purposes:

  1. In the absence of contention, simply operate base.
  2. Cell[]At initialization, the operation is rolled back to the base.

Cells are used to distribute hot resources. Each Cell instance itself can be treated as an AtomicLong. The LongAdder will treat the threadLocalRandomProbe property as the hash value of the Thread. The subscript is then calculated using hash & (cells.length-1). Each thread only operates on its own Cell instance with the corresponding subscript to reduce contention for hot resources.

CellsBusy can be regarded as a lock mark inside Striped64. 0 means no lock and 1 means lock. It can be modified through CAS. Locks are locked when Cell[] cells are created or expanded.

/** * Cell array, reduce hotspot resource usage. * /
transient volatile Cell[] cells;

/** * 1; /** * 2; /** * 2; Cells initializes the operation back to base. * /
transient volatile long base;

/** * Lock flag, 0 indicates no lock, 1 indicates lock */
transient volatile int cellsBusy;
Copy the code

Core method

add

When calling add(), LongAdder will first determine whether Cell[] has been created. If not, it indicates that there is no resource contention, and casBase() can be directly called to modify the base variable. If CAS fails, it indicates that there is resource contention, Cell[] should be created to disperse hot resources. Avoid too much spin.

If Cell[] is created, contention exists and the thread can manipulate its own Cell.

/** * adds the given value x */
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    // If cells is not created, CAS attempts to modify the base variable directly
    if((as = cells) ! =null| |! casBase(b = base, b + x)) {// There is contention for resources
        boolean uncontended = true;
        // Cells is not created
        if (as == null || (m = as.length - 1) < 0 ||
        	// Get the Cell of the current thread
            (a = as[getProbe() & m]) == null ||
            Cells has been created and the Cell corresponding to the current thread has been created. CAS directly operates on the Cell corresponding to the current thread! (uncontended = a.cas(v = a.value, v + x))) longAccumulate(x,null, uncontended); }}Copy the code

The general flow chart is as follows:Striped64 is called if there is contention for resourceslongAccumulate()Method, logic is quite complicated:

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;// The thread hash code
    if ((h = getProbe()) == 0) {
    	// If the thread hash code is 0, it is not initialized
        ThreadLocalRandom.current();// Force initialization, calculate threadLocalRandomProbe
        / / the Thread of the hash code, namely Thread. ThreadLocalRandomProbe
        h = getProbe();
        wasUncontended = true;
    }
    // Hash collision: different threads are mapped to the same Cell
    boolean collide = false;
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        // Cells has been initialized
        if((as = cells) ! =null && (n = as.length) > 0) {
        	/* (n-1) &h = null; /* (n-1) &h = null; /* (n-1) &h = null; * /
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // It can be operated only when there is no lock
                    Cell r = new Cell(x);   // Create a Cell instance
                    // Try locking
                    if (cellsBusy == 0 && casCellsBusy()) {
                    	// Add a Cell instance to cells,
                        boolean created = false;
                        try {// Add the lock and check again
                            Cell[] rs; int m, j;
                            if((rs = cells) ! =null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true; }}finally {
                        	/ / unlock
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            WasUncontended Indicates whether the last CAS operation succeeded
            else if(! wasUncontended)// If the CAS fails, it is reset to true and the hash code is recalculated
                wasUncontended = true;
            // If the Cell corresponding to the thread is not null, try to change the value of the Cell
            else if (a.cas(v = a.value, ((fn == null)? v + x : fn.applyAsLong(v, x))))break;
            // When the Cell[] length reaches the number of CPU cores, it does not need to be expanded
            else if(n >= NCPU || cells ! = as) collide =false;            // At max size or stale
            else if(! collide) collide =true;
            // Try locking to expand capacity
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {
                    	// Double the capacity
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; }}finally {
                	/ / unlock
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            / / hash code to calculate the current Thread, and written to the Thread. ThreadLocalRandomProbe
            h = advanceProbe(h);
        }
        // Cell[] uninitialized: try to lock and initialize
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];// The default length is 2
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true; }}finally {
            	/ / unlock
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        // Cell[] is being initialized and the operation is done directly on base
        else if (casBase(v = base, ((fn == null)? v + x : fn.applyAsLong(v, x))))break;                          // Fall back on using base}}Copy the code

There are three main cases of longAccumulate() :

  1. If Cell[] is not initialized, lock it and initialize it. The default length is 2.
  2. Cell[] During initialization, the operation falls back tobaseVariable, modify base directly for now.
  3. After Cell[] is initialized, check whether the Cell bound to the current thread has been created. If not, lock the Cell and put it into cells. If it has been created, directly operateCell.cas()If the failure occurs, multiple threads compete for the same Cell and need to expand capacity. If the array length reaches the number of CPU cores, it will not be expanded.

sum

The added values are scattered into Cell[], so the sum needs to count each value in base and Cell[].

/* Since other threads are still working on the Cell at the time of the summation, the result of the statistics is only an estimate. To calculate the exact value, a global lock must be added. * /
public long sum(a) {
    Cell[] as = cells; Cell a;
    long sum = base;// Add the values of all cells in sequence based on base
    if(as ! =null) {
        for (int i = 0; i < as.length; ++i) {
            if((a = as[i]) ! =null) sum += a.value; }}return sum;
}
Copy the code

Others, like Increment (), Decrement () and longValue(), are simply disguised calls to add() or sum() that won’t be explained here.

conclusion

AtomicLong uses a variable value as the actual value of LONG. All thread operations will compete for this resource, so value will become the only hot resource. When the concurrency is high, a large number of CAS will fail, and threads will conduct multiple spin retries, affecting performance.

LongAdder distributes hotspot resources between base and Cell[] through the idea of divide and conquer. When there is no resource contention, it can operate base directly and delay the creation of Cell[] as much as possible. When resource contention does occur, create a Cell[]. The default length is 2, and the capacity expansion rule is double. The maximum length is the number of CPU cores. The threadLocalRandomProbe of Thread is treated as a hash code, and the Cell is located by hashing & (cells.length-1). Each Thread only operates on its own Cell to reduce contention for resources. When the Cell and surplus, but the Thread of the hash code, LongAdder invokes advanceProbe hash code () to calculate the Thread, and returned to the Thread. Write threadLocalRandomProbe.

Divide and conquer and segment lock are the core ideas of LongAdder.