This article only for atomic accumulator addition API analysis, is the so-called ten thousand changes from its ancestor, the author yesterday after watching the teaching video, suddenly have insight, so at the moment write the next LongAddr source code analysis article, for code community seniors to read together.

Why was LongAddr introduced

inJUCWrap it up, masterDoug LeaHas been inventedAtomic classes.AtomicLongNature can also be used to documulativeSo why introduce itLongAddr? Let’s take a quick lookAtomicLongThe concrete implementation of.Obviously,AtomicLongUsing theUnsafeThe CAS operation under the package to implement the replacement, however,CASIn the case of multi-threading competition is not very efficient, a large number of threads occupy CPU resources will occur idle. Therefore, LongAddr was introduced to improve efficiency

The core of LongAddr

LongAdder maintains a delayed-initialization array of atomic updates (the Cell array is null by default) and a base variable base. Because Cells are relatively large in memory, they are not created initially, but when needed, which is lazy loading.

Cell

To understand LongAddr, you need to understand its field, Cell first

@sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try{ UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<? > ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw newError(e); }}}Copy the code

The @Contended annotation is used to prevent pseudo-cache line sharing, where the length of a cache line is 64 bytes and the size of a Cell is 24 bytes (object header 16 bytes + property-value-8 bytes). Cells also essentially use CAS operations under the Unsafe package to perform thread-safe operations (such as ++ operations)

NCPU

CASOperation is essentially a kind ofSpin blind cycle.Spin blind cycleIs going to consume CPU resources, andavailableProcessors()This method will get itJVMNumber of available cores.

Cells

The Cell array, this isLongAddrKey to improving performance

base

This value is updated when there is no contest

cellsBusy

Initialize theorcapacityIndicates the CAS flag bit of

The increment LongAddr

add

public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if((as = cells) ! =null| |! casBase(b = base, b + x)) {boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null| |! (uncontended = a.cas(v = a.value, v + x))) longAccumulate(x,null, uncontended); }}Copy the code

To figure out what a method does, we need to think about it in terms of deconstruction, so in this add method, it has five temporary variables and two member variables (the parent of LongAddr).

First the if

I have to say that the logic in the first IF is very clever, and there is a lot of logic in just one line of code, which is worth looking at.

  • As = cells! = null || ! casBase(b = base,b+x)

Cells require a large amount of memory and are lazily loaded, so on the first accumulation, as! =null must be false, but it does not mean that the subsequent CAS operation will succeed. If A thread A and B are lazy, cells must be null, and then the cas operation will be executed successfully in the second half of the if statement. The one that doesn’t succeed then moves on to the next stage of judgment

  • as == null || (m = as.length – 1)
  • a = as[getProbe() & m]) == null

These two lines determine which Cell element in the cells array should be accessed by the current thread, and if the mapped Cell element exists, the code is executed

  • ! (uncontended = a.cas(v = a.value, v + x))

This line of code clearly uses the CAS operation to update the value of the Cell element in the cells array corresponding to the current thread. If the current thread mapped Cell element does not exist, or the CAS operation fails, this line of code is executed

  • longAccumulate(x, null, uncontended);

Before we look at the longAccumulate method, we can summarize what these temporary variables do in the add method. As represents the array of cells, m->cells of length -1, b->base, A ->Cell (the Cell element corresponding to the current thread in the cells array) V -> The value of the Cell element corresponding to the current thread in the cells array. In addition, which Cell element in the current thread accesses is determined by getProbe() &m. We already know that m represents the length of the array -1, so getProbe gets the value of threadLocalRandomProbe in the current thread, which starts with 0. In the longAccumuulate method initialization takes place and the current thread ensures atomicity of value updates through CAS operations assigned to Cell elements.

longAccumulate

This method is at the heart of LongAddr, and the logic is extremely complex

 final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        // Initialize the Probe value of the current thread
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        //
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if((as = cells) ! =null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if((rs = cells) ! =null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true; }}finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if(! wasUncontended)// CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (a.cas(v = a.value, ((fn == null)? v + x : fn.applyAsLong(v, x))))break;
                else if(n >= NCPU || cells ! = as) collide =false;            // At max size or stale
                else if(! collide) collide =true;
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; }}finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            // Initialize the cells array
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true; }}finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null)? v + x : fn.applyAsLong(v, x))))break;                          // Fall back on using base}}Copy the code

Because the code is extremely complex, the author only can try their best to code logic says it smooth, but her mind to do less than perfect, so think of the author on the analysis of the code, must be in line with the logic of doubt and criticism to read, because otherwise the author’s negligence caused some misleading results, it is not beautiful. On the other hand, if you find something unbeautiful, you are welcome to point it out.

Initialization of the Cells array

 else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true; }}finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
Copy the code

The casCellsBusy method, which uses cas to set cellBusy to 1 to indicate that cells are being initialized or expanded. Of course, cells are being initialized in the code above

Init flag, set to false to indicate that cells have not been initialized. After initialization, set cellsBusy to 0, which can be regarded as an unlock operation.

Add a cell element

  if((as = cells) ! =null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if((rs = cells) ! =null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true; }}finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if(! wasUncontended)// CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (a.cas(v = a.value, ((fn == null)? v + x : fn.applyAsLong(v, x))))break;
                else if(n >= NCPU || cells ! = as) collide =false;            // At max size or stale
                else if(! collide) collide =true;
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; }}finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
Copy the code

The context of this code should be: if probe calculated by a thread is null in the array of cells, or probe is larger than the array length of current cells, add Cell element to the array of cells. This operation needs to be locked, cas optimistic lock.

 else if (a.cas(v = a.value, ((fn == null)? v + x : fn.applyAsLong(v, x))))break;
Copy the code

This code uses the CAS operation to replace the value of the Cell element corresponding to the current thread, and exits the loop if successful.

  else if(n >= NCPU || cells ! = as) collide =false;            // At max size or stale
Copy the code

As mentioned earlier, the optimal performance of cas operations is achieved when the number of threads performing cas operations is equal to the number of cpus, so the length of the cells array depends on the number of cpus available (specifically, the number of cores available to the JVM).

 else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; }}finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
Copy the code

This code is performing capacity expansion, which requires the SAME CAS lock as initialization and addition of Cell elements.

 h = advanceProbe(h);
Copy the code

This is a recalculation of the hash value, which occurs when the current thread evaluates the offset of the corresponding Cell in the cells array in the event of a race, looking for a free Cell