Basic introduction

Both LongAdder and AtomicLong are used to count counters. AtomicLong is used to count counters through CAS operations, but performance is low under high concurrency conditions.

Alibaba’s development manual makes it clear:

The inheritance structure of LongAdder is as follows:

//LongAdder is a subclass of Striped64
public class LongAdder extends Striped64 implements Serializable {}Copy the code

The important attributes in the Striped64 class are as follows:

abstract class Striped64 extends Number {
    /** * Padded variant of AtomicLong supporting only raw accesses plus CAS. * * JVM intrinsics note: It would be possible to use a release-only * form of CAS here, if it were provided. */
    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try{ UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<? > ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw newError(e); }}}/** Number of CPUS, to place bound on table size */
    // Indicates the number of cpus in the current computer. A key condition that controls the length of the cells array
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    /** * Table of cells. When non-null, size is a power of 2
    transient volatile Cell[] cells;

    /** * Base value, used mainly when there is no contention, But also as * a fallback during table initialization RACES. Updated via CAS. * there have been no competition, the data will accumulate to a base | when cells increase, The data needs to be written to base */
    transient volatile long base;

    /** * Spinlock (locked via CAS) used when resizing and/or creating Cells. 1 indicates that another thread already holds the lock */
    transient volatile int cellsBusy;

    /** * Package-private default constructor */
    Striped64() {
    }

    /** * CASes the base field. * by modifying the base value */
    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }

    /** * CASes the cellsBusy field from 0 to 1 to acquire lock
    final boolean casCellsBusy(a) {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0.1);
    }

    /** * Returns the probe value for the current thread. * Duplicated from ThreadLocalRandom because of packaging Restrictions. * * Gets the Hash value of the current thread */
    static final int getProbe(a) {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }

    /** * Pseudo-randomly advances and records the given probe value for the * given thread. * Duplicated from ThreadLocalRandom Because of Packaging Restrictions. * * Resets the Hash value of the current thread */
    static final int advanceProbe(int probe) {
        probe ^= probe << 13;   // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
        returnprobe; }}Copy the code

Cell is a Java. Util. Concurrent. Atomic Striped64 under an inner class, LongAdder basic train of thought is dispersive hotspot, scatter value value into a Cell array, different threads will hit to the array of different slot, Each thread only CAS the value in its slot, so hot spots are scattered and collisions are much less likely. To get the real long value, you simply return the values of the variables in each slot.

Sum () will sum all the values and bases in the Cell array as the return value. The core idea is to split the update pressure of AtomicLong’s single value into multiple values, thus demoting the update hotspot.

That is, there is a base variable inside, a Cell[] array.

  • Base variable: directly added to this variable under non-race conditions
  • Cell[] array: adds slots in Cell[I] for each thread in a race condition

The sum() method is used to calculate the total value.

/** * add the value of base to the value of all slots in the cells array */
public long sum(a) {
    Cell[] as = cells; Cell a;
    long sum = base;
    if(as ! =null) {
        for (int i = 0; i < as.length; ++i) {
            if((a = as[i]) ! =null) sum += a.value; }}return sum;
}
Copy the code

LongAdder, like AtomicLong, operates on the same base when there is no competition. When there is a competition, it divides the whole into parts by changing time from space, using an array of cells, and splitting a value into this array of cells. When multiple threads need to operate on a value at the same time, they can hash the thread ID to obtain a hash value, map the hash value to a subscript of the array cells, and increment the value corresponding to the subscript. When all threads are finished, all the values of the array cells and the uncontested value base are added up as the final result.

Source code analysis

Entry: longAdder increment ()

public void increment(a) {
    add(1L);
}
Copy the code

Then look at the add method as follows:

public void add(long x) {
    //as stands for cells reference
    //b represents the base value obtained
    //v represents the expected value
    //m represents the length of the cells array
    // A represents the cell hit by the current thread
    Cell[] as; long b, v; int m; Cell a;

    // True -> indicates that cells have been initialized and the current thread should write data to the corresponding cell
    // false-> Indicates that cells is not initialized and all threads should write data to base

    False -> Indicates that the cas data is successfully replaced by the current thread.
    // true-> Indicates that a competition occurs and may need to be retried or expanded
    if((as = cells) ! =null| |! casBase(b = base, b + x)) {// When will it come in?
        True -> indicates that cells have been initialized and the current thread should write data to the corresponding cell
        //2.true-> Indicates that a competition occurs and may need to be retried or expanded



        //true -> uncontended false-> Contended
        boolean uncontended = true;

        // True -> cells is not initialized
        // false-> Indicates that cells have been initialized, and the current thread should find its own cell to write values

        M indicates the length of cells. -1 cells must be a power of 2. 15= b1111
        // true-> The current thread subscript cell is empty, need to create longAccumulate support
        False -> Indicates that the cell corresponding to the current thread is not empty, indicating that the next step is to add the x value to the cell.

        // Condition 3: true-> Indicates that the CAS fails, which means that the cell corresponding to the current thread is competing
        // false-> Indicates that the CAS is successful
        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null| |! (uncontended = a.cas(v = a.value, v + x)))// What is called?
            / / 1. True - > show cells uninitialized, namely multithreaded writing compete base [retry | initialization cells]
            //2.true-> The current thread subscript cell is empty, need to create longAccumulate support
            / / 3. True - > said cas fail, means that the current thread corresponding cell competitive [retry | expansion]
            longAccumulate(x, null, uncontended); }}Copy the code

Conditional increase, step by step analysis, as follows:

  • 1. Update base only when there is no competition at first;
  • 2. If base update fails, create a Cell[] array for the first time
  • 3. If multiple threads compete fiercely for the same Cell, Cell[] may need to be expanded

The longAccumulate input parameter is described as follows:

The longAccumulate method will only be called in three cases

  • 1 cells uninitialized, multithreading written compete base [retry | initialization cells]
  • 2. The subscript cell of the current thread is empty, so longAccumulate support needs to be created
  • 3 cas failure, means that the current thread corresponding cell competitive [retry | expansion]

The general outline of longAccumulate method is as follows:

This code first assigns a hash value to the current thread and then enters a for(;;). Spin, this spin is divided into three branches:

  • CASE1: The Cell[] array has been initialized
  • CASE2: Cell[] array not initialized (first created)
  • CASE3: The Cell[] array is being initialized

You start by initializing the Cell[] array (new for the first time), that is, not initializing the Cell[] array, trying to hold the lock and initializing the cells array for the first time.

//CASE2: preconditions cells have not initialized as null
// Condition 1: true indicates that the current lock is not locked
Cells == as? Because other threads might modify cells after you assign as
CellsBusy = 1; false: another thread is holding the lock
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
    boolean init = false;
    try {                           // Initialize table
        //cells == as? Prevent the current thread from losing data when it is initialized again after another thread has initialized
        if (cells == as) {
            Cell[] rs = new Cell[2];
            rs[h & 1] = new Cell(x);
            cells = rs;
            init = true; }}finally {
        cellsBusy = 0;
    }
    if (init)
        break;
}
Copy the code

Cell[] RS = new Cell[2] indicates that the length of the array is 2, rs[h & 1] = new Cell(x) indicates that a new Cell element is created, value is x value, default is 1. H & 1 is similar to the hash bucket index algorithm we used for HashMap, which is usually hash & (table.len-1). Same meaning as hashmap

The else module in the bottom of the pocket, i.e., threads that attempt to modify CAS and fail, will go to this branch as follows:

This branch implements direct manipulation of the base base, adding values to the base, that is, other threads are initializing and multiple threads are updating the base value.

When the cell is already initialized, the flow code looks like this:

//CASE1: indicates that cells have been initialized and the current thread should write data to the corresponding cell
if((as = cells) ! =null && (n = as.length) > 0) {
    //2.true-> The current thread subscript cell is empty, need to create longAccumulate support
    / / 3. True - > said cas fail, means that the current thread corresponding cell competitive [retry | expansion]

    //CASE1.1:true-> Indicates that the subscript position of the current thread is null and a new cell needs to be created
    if ((a = as[(n - 1) & h]) == null) {

        //true-> Indicates that the lock is not occupied false-> Indicates that the lock is occupied
        if (cellsBusy == 0) {       // Try to attach new Cell

            // Create Cell with current x
            Cell r = new Cell(x);   // Optimistically create

            // Condition 1: true-> Indicates that the current lock is not occupied false-> indicates that the lock is occupied
            False -> The current thread failed to acquire the lock..
            if (cellsBusy == 0 && casCellsBusy()) {
                // Whether the tag was successfully created
                boolean created = false;
                try {               // Recheck under lock
                    //rs represents the current cells reference
                    //m stands for cells length
                    //j represents the subscript hit by the current thread
                    Cell[] rs; int m, j;

                    // Condition 1 and condition 2 are true
                    //rs[j = (m-1) &h] == null to prevent other threads from initializing the position and the current thread from initializing the position again
                    // Data is lost
                    if((rs = cells) ! =null &&
                            (m = rs.length) > 0 &&
                            rs[j = (m - 1) & h] == null) {
                        rs[j] = r;
                        created = true; }}finally {
                    cellsBusy = 0;
                }
                if (created)
                    break;
                continue;           // Slot is now non-empty}}// Capacity expansion intent is forcibly changed to false
        collide = false;
    }
    / / CASE1.2:
    WasUncontended: False only after cells are initialized and the current thread has failed to compete for changes
    else if(! wasUncontended)// CAS already known to fail
        wasUncontended = true;      // Continue after rehash
        //CASE 1.3: The current thread rehashes the hash value and the new hit cell is not null
        //true -> Write successfully, exit loop
        //false -> Indicates that the new cell matched after rehash is also competing
    else if (a.cas(v = a.value, ((fn == null)? v + x : fn.applyAsLong(v, x))))break;
        / / CASE 1.4:
        // Condition 1: n >= NCPU true-> Expansion intention changed to false, indicating that the expansion is not false-> Indicates that the cells array can be expanded
        Cells! = as true-> Other threads have been expanded, the current thread rehash after retry
    else if(n >= NCPU || cells ! = as)// If the expansion intention is set to false, the expansion will not be performed
        collide = false;            // At max size or stale
        / / CASE 1.5:
        / /! Collide = set the capacity expansion intent to "true" but not necessarily happening
    else if(! collide) collide =true;
        //CASE 1.6: True expansion logic
        // condition 1: cellsBusy == 0 true-> Indicates that there is no lock, and the current thread can compete for the lock
        // Condition 2: casCellsBusy true-> The current thread successfully obtains the lock and can perform expansion logic
        // false-> Another thread is performing capacity expansion operations at the current time.
    else if (cellsBusy == 0 && casCellsBusy()) {
        try {
            //cells == as
            if (cells == as) {      // Expand table unless stale
                Cell[] rs = new Cell[n << 1];
                for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; }}finally {
            / / releases the lock
            cellsBusy = 0;
        }
        collide = false;
        continue;                   // Retry with expanded table
    }

    // Rehash the Hash value of the current thread
    h = advanceProbe(h);
}
Copy the code

When the cell has been initialized

  • 1. If the hash slot of the current thread is null, create a cell using cas, assign a value to the cell, and store the cell into the cells array
//CASE1.1:true-> Indicates that the subscript position of the current thread is null and a new cell needs to be created
if ((a = as[(n - 1) & h]) == null) {

    //true-> Indicates that the lock is not occupied false-> Indicates that the lock is occupied
    if (cellsBusy == 0) {       // Try to attach new Cell

        // Create Cell with current x
        Cell r = new Cell(x);   // Optimistically create

        // Condition 1: true-> Indicates that the current lock is not occupied false-> indicates that the lock is occupied
        False -> The current thread failed to acquire the lock..
        if (cellsBusy == 0 && casCellsBusy()) {
            // Whether the tag was successfully created
            boolean created = false;
            try {               // Recheck under lock
                //rs represents the current cells reference
                //m stands for cells length
                //j represents the subscript hit by the current thread
                Cell[] rs; int m, j;

                // Condition 1 and condition 2 are true
                //rs[j = (m-1) &h] == null to prevent other threads from initializing the position and the current thread from initializing the position again
                // Data is lost
                if((rs = cells) ! =null &&
                        (m = rs.length) > 0 &&
                        rs[j = (m - 1) & h] == null) {
                    rs[j] = r;
                    created = true; }}finally {
                cellsBusy = 0;
            }
            if (created)
                break;
            continue;           // Slot is now non-empty}}// Capacity expansion intent is forcibly changed to false
    collide = false;
}
Copy the code
  • 2. If the slot in the cells array corresponding to the current thread is not null and the CAS operation fails to modify the value, a race exists. willwasUncontendedChange to true and then call the bottom oneh = advanceProbe(h);Reset the Hash value of the current thread,
WasUncontended: False only after cells are initialized and the current thread has failed to compete for changes
else if(! wasUncontended)// CAS already known to fail
    wasUncontended = true;      // Continue after rehash

// Rehash the Hash value of the current thread
h = advanceProbe(h);
Copy the code
  • 3. After resetting the Hash value of the current thread, check whether the slot in the corresponding cells array is empty. If the slot is empty, store the value to the corresponding slot. If the modification succeeds, no further action is required
else if (a.cas(v = a.value, ((fn == null)? v + x : fn.applyAsLong(v, x))))break;
Copy the code
  • 4. If the modification fails in Step 3, the expansion intention is displayedcollideIs set to true
else if(! collide) collide =true;
Copy the code
  • 5. If you fail to change the slot value again, the system expands the slot number.
else if (cellsBusy == 0 && casCellsBusy()) {
    try {
        //cells == as
        if (cells == as) {      // Expand table unless stale
            // Double the capacity
            Cell[] rs = new Cell[n << 1];
            for (int i = 0; i < n; ++i)
                // Copy the values from the previous cells array into the expanded arrayrs[i] = as[i]; cells = rs; }}finally {
        / / releases the lock
        cellsBusy = 0;
    }
    collide = false;
    continue;                   // Retry with expanded table
}
Copy the code

The complete code is as follows:

// What is called?
/ / 1. True - > show cells uninitialized, namely multithreaded writing compete base [retry | initialization cells]
//2.true-> The current thread subscript cell is empty, need to create longAccumulate support
/ / 3. True - > said cas fail, means that the current thread corresponding cell competitive [retry | expansion]

WasUncontended: False only after cells are initialized and the current thread has failed to compete for changes
final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    //h indicates the hash value of the thread
    int h;
    // Conditional: No hash value has been assigned to the current thread
    if ((h = getProbe()) == 0) {
        // Assigns a hash value to the current thread
        ThreadLocalRandom.current(); // force initialization
        // Assign the hash value of the current thread to h
        h = getProbe();
        // Why? By default, the current thread must have written to cells[0]. Don't think of it as a real competition
        wasUncontended = true;
    }

    // Indicates the expansion intention. False Indicates the expansion intention. True indicates the expansion intention.
    boolean collide = false;                // True if last slot nonempty

    / / spin
    for (;;) {
        //as stands for cells reference
        // A indicates the cell hit by the current thread
        //n indicates the length of the cells array
        //v represents the expected value
        Cell[] as; Cell a; int n; long v;

        //CASE1: indicates that cells have been initialized and the current thread should write data to the corresponding cell
        if((as = cells) ! =null && (n = as.length) > 0) {
            //2.true-> The current thread subscript cell is empty, need to create longAccumulate support
            / / 3. True - > said cas fail, means that the current thread corresponding cell competitive [retry | expansion]

            //CASE1.1:true-> Indicates that the subscript position of the current thread is null and a new cell needs to be created
            if ((a = as[(n - 1) & h]) == null) {

                //true-> Indicates that the lock is not occupied false-> Indicates that the lock is occupied
                if (cellsBusy == 0) {       // Try to attach new Cell

                    // Create Cell with current x
                    Cell r = new Cell(x);   // Optimistically create

                    // Condition 1: true-> Indicates that the current lock is not occupied false-> indicates that the lock is occupied
                    False -> The current thread failed to acquire the lock..
                    if (cellsBusy == 0 && casCellsBusy()) {
                        // Whether the tag was successfully created
                        boolean created = false;
                        try {               // Recheck under lock
                            //rs represents the current cells reference
                            //m stands for cells length
                            //j represents the subscript hit by the current thread
                            Cell[] rs; int m, j;

                            // Condition 1 and condition 2 are true
                            //rs[j = (m-1) &h] == null to prevent other threads from initializing the position and the current thread from initializing the position again
                            // Data is lost
                            if((rs = cells) ! =null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true; }}finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty}}// Capacity expansion intent is forcibly changed to false
                collide = false;
            }
            / / CASE1.2:
            WasUncontended: False only after cells are initialized and the current thread has failed to compete for changes
            else if(! wasUncontended)// CAS already known to fail
                wasUncontended = true;      // Continue after rehash
                //CASE 1.3: The current thread rehashes the hash value and the new hit cell is not null
                //true -> Write successfully, exit loop
                //false -> Indicates that the new cell matched after rehash is also competing
            else if (a.cas(v = a.value, ((fn == null)? v + x : fn.applyAsLong(v, x))))break;
                / / CASE 1.4:
                // Condition 1: n >= NCPU true-> Expansion intention changed to false, indicating that the expansion is not false-> Indicates that the cells array can be expanded
                Cells! = as true-> Other threads have been expanded, the current thread rehash after retry
            else if(n >= NCPU || cells ! = as)// If the expansion intention is set to false, the expansion will not be performed
                collide = false;            // At max size or stale
                / / CASE 1.5:
                / /! Collide = set the capacity expansion intent to "true" but not necessarily happening
            else if(! collide) collide =true;
                //CASE 1.6: True expansion logic
                // condition 1: cellsBusy == 0 true-> Indicates that there is no lock, and the current thread can compete for the lock
                // Condition 2: casCellsBusy true-> The current thread successfully obtains the lock and can perform expansion logic
                // false-> Another thread is performing capacity expansion operations at the current time.
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    //cells == as
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; }}finally {
                    / / releases the lock
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }

            // Rehash the Hash value of the current thread
            h = advanceProbe(h);
        }
        //CASE2: preconditions cells have not initialized as null
        // Condition 1: true indicates that the current lock is not locked
        Cells == as? Because other threads might modify cells after you assign as
        CellsBusy = 1; false: another thread is holding the lock
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                //cells == as? Prevent the current thread from losing data when it is initialized again after another thread has initialized
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true; }}finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        / / CASE3:
        //1. CellsBusy is currently locked, indicating that another thread is initializing cells, so the current thread adds the value to base
        Cells after being initialized by another thread, the current thread needs to add data to base
        else if (casBase(v = base, ((fn == null)? v + x : fn.applyAsLong(v, x))))break;                          // Fall back on using base}}Copy the code

The general steps are as follows:

A small summary

AtomicLong principle: CAS+ spin

Scene:

  • Global computation at low concurrency
  • AtomicLong ensures the accuracy of the count in the case of concurrency, and internal CAS is used to solve the problem of concurrency security.

Defects: Performance drops dramatically after high concurrency, AtomicLong spin becomes a bottleneck, N threads CAS operation to modify the value of the thread, each time only one successful, the other N-1 failed, failed to keep spinning until success, so a large number of failed spin, all of a sudden CPU hit high.

LongAdder principle:

  • CAS+Base+Cell arrays are dispersed
  • Space changes time and scatters hot data

Scenario: Global computation under high concurrency

Bug: If the calculation thread changes the sum, the final result is not accurate enough