preface

Most developers have seen or written concurrent programming code at some point. In addition to Synchronized (which you should know about upgrading Synchronized locks), there is another branch of the concurrency keyword, Atomic. If you have not heard of the use of the first look at the base, if you have heard of the use, please slide to the bottom to see the advanced, in-depth source analysis.

Ask the question: Are int threads safe?

Those of you who have read articles about Synchronized should know that it is not safe to use the same code again:

public class testInt {
    static int number = 0;

    public static void main(String[] args) throws Exception {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for(int i = 0; i < 100000; i++) { number = number+1; }}}; Thread t1 = new Thread(runnable); t1.start(); Thread t2 = new Thread(runnable); t2.start(); t1.join(); t2.join(); System.out.println("number:"+ number); }}Copy the code

Running results:



In the example above, we define a static variable number, and the initial value of 0 to newly built and run two threads to the implementation of 100000 times since the operation, if he is thread-safe, should be two threads execute after the results of 200000, but we found that the end result is less than 200000, which shows that he is not safe.

As mentioned in the Synchronized article, you can add the Synchronized keyword above and below number=number+1 to ensure thread-safety. However, it is resource expensive, so today we will take a look at another way to implement thread-safety, Atomic.

The dividing line of the Atomic base

Atomic integer (base type)

Overall introduction

Atomic is JDK offers a series of the floorboard of the package, the large family including Atomic integer (AtomicInteger AtomicLong, AtomicBoolean), Atomic reference (AtomicReference AtomicStampedReference, AtomicMarkableReference), Atomic array (AtomicIntegerArray AtomicLongArray, AtomicReferenceArray), Updater (AtomicIntegerFieldUpdater, AtomicLongFieldUpdater AtomicReferenceFieldUpdater).



AtomicInteger

AtomicInteger, AtomicBoolean, and AtomicLong have similar functions, so WE’ll use AtomicInteger as the main atom class.

Let’s take a look at the apis and what they do:

public class testInt {public static void main(String[] args) {// Define a variable of type AtomicInteger. The value is 1 AtomicInteger I = new AtomicInteger(1); System.out.println(i.incrementAndGet())); //incrementAndGet () incrementAndGet(); System.out.println(i.getandIncrement ())); //getAndIncrement (); System.out.println(i.git ())); // Get returns the current value of I, so print 3. System.out.println(i.getandAdd (666))); // getAndAdd(666); System.out.println(i.getandAdd (-1))); // If the argument is negative, subtract. GetAndAdd returns the value and then subtract 1. System.out.println(i.addandget (666))); System.out.println(i.addandget (-1))); // The getAndUpdate method IntUnaryOperator argument is an arrow function that can be followed by any operation, Println (i.getandUpdate (x -> (x * 10 + 1))); // The final print I is 13331 system.out.println (i.git ()); }}Copy the code

Execution Result:



An improvement on the int example above

public class testInt {//1. Define AtomicInteger number static AtomicInteger number = new AtomicInteger(0); public static void main(String[] args) throws Exception { Runnable runnable = newRunnable() {
            @Override
            public void run() {
                for(int i = 0; i < 100000; IncrementAndGet (); // incrementAndGet(); }}}; Thread t1 = new Thread(runnable); t1.start(); Thread t2 = new Thread(runnable); t2.start(); t1.join(); t2.join(); System.out.println("number:"+ number.get()); }}Copy the code

We can see that the run result is correct 200,000, indicating that AtomicInteger does guarantee thread-safety, that is, in the process of multi-threading, the run result is correct. But there is an ABA problem, which we’ll talk about when we refer to atoms, so let’s set a flag.



Source code analysis

Looking at the underlying implementation, the incrementAndGet method in the AtomicInteger class calls the getAndAddInt method in the Unsafe class.

public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}Copy the code

So if we look at getAndAddInt, there’s a loop in there, and the immediate value is compareAndSwapInt and the return value is true to end the loop. This is where CAS comes in, and this is the solution to the multithreaded security problem.

public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(! this.compareAndSwapInt(var1, var2, var5, var5 + var4));return var5;
}Copy the code

CAS

Thread 1 and thread 2 get the main memory variable value 0, thread 1 increments 1 and writes to main memory, thread 2 increments 2 and tries to write to main memory, thread 1 cannot write to main memory because it overwrites thread 1’s operations.



CAS is in thread 2 attempts to write memory, through comparison and set the CompareAndSet () found that now the main memory is the current value is 1, and he began to read the value of 0, so he will give up this change, to read the latest value of main memory, and then retry the logic operation, thread 2 under try again to write in the memory. If thread 1 makes changes to main memory again, thread 2 finds that the value of main memory is now different from what it expected, so it aborts the change, reads the latest value of main memory again, and tries again to write to main memory. We can see that this is a repeat comparison process, that is, the main memory will not be written until it is the same as the expected initial value, otherwise the retry cycle will be read. That’s what the for loop above means.

CAS is implemented using CPU instructions. If the operating system does not support CAS, it will still be locked. If the operating system supports CAS, it will use atomic CPU instructions.

Atomic reference

In daily use, we need to atomize not only the basic types mentioned above, but also some complex types, so we need AtomicReference.

Unsafe implementation

First look at the insecure BigDecimal type:

public class testReference {
    static BigDecimal number = BigDecimal.ZERO;

    public static void main(String[] args) throws Exception {

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for(int i = 0; i < 1000; i++) { number=number.add(BigDecimal.ONE); }}}; Thread t1 = new Thread(runnable); t1.start(); Thread t2 = new Thread(runnable); t2.start(); t1.join(); t2.join(); System.out.println(number); }}Copy the code

The result is shown in the following figure. We can see that the two threads loop 1000 times incrementing by 1. The final result should be 2000, but the result is less than 2000.



Security implementation – Using CAS

public class test// Define AtomicReference type BigDecimal variable static AtomicReference<BigDecimal> number = new AtomicReference<BigDecimal>(BigDecimal.ZERO); public static void main(String[] args) throws Exception { Runnable runnable = newRunnable() {
            @Override
            public void run() {
                for(int i = 0; i < 1000; I++) {// manually write loop +CAS judgmentwhile(true){
                        BigDecimal pre=number.get();
                        BigDecimal next=number.get().add(BigDecimal.ONE);
                      if(number.compareAndSet(pre,next))  {
                          break; }}}}}; Thread t1 = new Thread(runnable); t1.start(); Thread t2 = new Thread(runnable); t2.start(); t1.join(); t2.join(); System.out.println(number.get()); }}Copy the code

The running results are as follows:



ABA problems and solutions

In the above CAS process, whether the update can be successful or not is determined by comparing the values. If thread 1 first adds 1 and then subtracts 1, then the main memory is still the original value, that is, thread 2 can still update successfully. However, this logic is wrong, thread 1 has been modified, thread 2 can not directly update success.



Code:

public class testInt {

    static AtomicInteger number = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {


        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                int a = number.get();
                System.out.println("Start number."+ a); try { Thread.sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(number.compareAndSet(a, a++)); }}); t1.start(); Thread t2 = new Thread(newRunnable() {
            @Override
            public void run() {
                System.out.println("Start adding operations");
                int a = number.incrementAndGet();
                System.out.println("The current number." + a);
                int b = number.decrementAndGet();
                System.out.println("The current number."+ b); }}); t2.start(); t1.join(); t2.join(); }}Copy the code

Let’s see thread 2 does a bunch of things to it, but at the end of the day it prints true, indicating that it can be updated successfully. This is clearly not true.



You can add a version number to your AtomicStampedReference. Thread 1 at the beginning of the main memory of the time, access to the value is 0, version 1, thread 2 also get the two values, thread 1 plus 1, the operation of the minus 1, each plus 1 version, now of the main memory of the value is 0, version 2, and thread 2 also took the estimated value is 0, version 1 try writing data of main memory, The update failed at this time due to a different version. Specifically, we use code to try:

public class testInt {

    static AtomicStampedReference<Integer> number = new AtomicStampedReference<Integer>(0, 0);

    public static void main(String[] args) throws Exception {


        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                int a = number.getReference();
                int s = number.getStamp();
                System.out.println("Start number." + a + ",stamp:"+ s); try { Thread.sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(number.compareAndSet(a, a + 1, s, s + 1)); }}); t1.start(); Thread t2 = new Thread(newRunnable() {
            @Override
            public void run() {
                System.out.println("Start adding operations");
                int a = number.getReference();
                int s = number.getStamp();
                number.compareAndSet(a, a + 1, s, s + 1);
                System.out.println("The current number." + a + ",stamp:" + (s + 1));
                a = number.getReference();
                s = number.getStamp();
                number.compareAndSet(a, a - 1, s, s + 1);
                System.out.println("The current number." + a + ",stamp:"+ (s+1)); }}); t2.start(); t1.join(); t2.join(); }}Copy the code

We can see that stamp (version number) will be updated every time. In the final comparison, not only the value but also the version number will be compared, so the update cannot be successful, false.



An array of atoms

AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray are similar, so if I take AtomicIntegerArray for example, We can think of the following AtomicIntegerArray as an array of type AtomicInteger.

AtomicIntegerArray  array = new AtomicIntegerArray(10); array.getAndIncrement(0); // Increments the 0th element atomically by 1Copy the code

AtomicInteger[] array = new AtomicInteger[10]; array[0].getAndIncrement(); // Increments the 0th element atomically by 1Copy the code

The field updater and atom accumulator are relatively simple and will not be mentioned here.

Atomic advances the division of the text

LongAdder source code analysis

LongAdder use

LongAdder was added after JDk1.8, so why was it added? We’re going to answer that question, and we’re going to look at how to use it.

public class testLongAdder {
    public static void main(String[] args) throws Exception {
        LongAdder number = new LongAdder();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for(int j = 0; j < 10000; j++) { number.add(1L); }}}; Thread t1 = new Thread(runnable); Thread t2 = new Thread(runnable); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("number:"+ number); }}Copy the code

We can see that LongAdder is used in much the same way as AtomicLong, with two threads Thread1,Thread2 incrementing the number value 10,000 times each, resulting in the correct number of 20,000.



Comparative advantage over Atomic

The question is, why LongAdder if AtomicLong can perform thread-safe operations on number in multiple threads? Let’s start with a code comparison, the difference in performance between the two, given that both results are correct.

 public class testLongAdder {public static void main(String[] args) {// /1 thread, 1 million autoincrement operationtest1 (1100000); //10 threads, 1 million increment operationstest1 (10100000); //100 threads, 1 million increment operationstest1 (100100000); } static voidtest1(int threadCount,int times){
        long startTime=System.currentTimeMillis();
        AtomicLong number1=new AtomicLong();
        List<Thread> threads1=new ArrayList<>();
        for(int i=0; i<threadCount; i++) { threads1.add(new Thread(newRunnable() {
                @Override
                public void run() {
                    for (int j = 0; j < times; j++) { number1.incrementAndGet(); }}})); } threads1.forEach(thread -> thread.start()); threads1.forEach(thread ->{ try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); }}); long endTime=System.currentTimeMillis(); System.out.println("AtomicLong:"+number1+",time:"+(endTime-startTime));

        LongAdder number2=new LongAdder();
        List<Thread> threads2=new ArrayList<>();
        for(int i=0; i<threadCount; i++) { threads2.add(new Thread(newRunnable() {
                @Override
                public void run() {
                    for (int j = 0; j < times; j++) { number2.add(1); }}})); } threads2.forEach(thread -> thread.start()); threads2.forEach(thread ->{ try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); }}); System.out.println("LongAdder:"+number2+",time:"+(System.currentTimeMillis()-endTime)); }}Copy the code

The code above compares the time taken by AtomicLong and LongAdder for 1 thread, 10 thread, and 100 thread after 100 hundred increment operations. By printing the statement, we see that LongAdder takes an order of magnitude less time than AtomicLong on the basis that the final number1 and number2 are correct.



Source code analysis

So why does this happen? We have to analyze it from the source level. Why is AtomicLong inefficient? Because if you have a large number of threads, especially if you have high concurrency, like 100 threads trying to manipulate an object at the same time, only one thread is going to get the lock, and the other 99 threads are going to be idling around until the thread releases the lock. If the thread finishes and releases the lock, the other 99 threads compete again, with only one thread acquiring the lock and the other 98 threads idling until the lock is released. This can waste a lot of resources on idling, making the AtomicLong slower and slower as the number of threads increases.

AtomicLong is when multiple threads operate on the same value, resulting in too many spins and poor performance. LongAdder, like AtomicLong, operates on the same base in the case of no competition. When competition occurs, it divides the whole into parts, replacing time from space, using an array of cells, and splitting a value into this array of cells. When multiple threads need to operate on a value at the same time, they can hash the thread ID to obtain a hash value, map the hash value to a subscript of the array cells, and increment the value corresponding to the subscript. When all threads are finished, all the values of the array cells and the uncontested value base are added up as the final result.



We first look at the fields in LongAdder and find that there are no fields in LongAdder, mainly in its inherited Stripped64 class, there are the following four main variables.

*/ static final int NCPU = Runtime.getruntime ().availableProcessors(); /** *cells array, power of 2, 2,4,8,16..... */ TRANSIENT volatile Cell[] cells; /** * base value, mainly used in the case of no contention, updated via CAS. */ transient volatile long base; /** * Resize cells (capacity expansion) to create cell locks. */ transient volatile int cellsBusy;Copy the code

Here’s the add method to start.

Public void add(long x) {//as: cells reference //b: base value //v: expected value //m: cells size //a: Cell hit in the current array [] as; long b, v; int m; Cell a; // As is not empty (cells have been initialized, indicating that another thread initialized it before) or CAS operation failed (contention between threads)if((as = cells) ! = null || ! CasBase (b = base, b + x)) {trueBoolean uncontended = indicates that there are two cases that are not contendedtrue; // As = null(cells not initialized) // Or the thread ID hash subscript is null(cell = null), getProbe() &m function is to obtain the subscript, the underlying logic is bit-operation // or update failure isfalseWhen there is competition, take not to be trueif(as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || ! (uncontended = a.value (v = a.value, v + x))ifInside, the instructions are updatedcaseFailed, or updating a cell also failed, or cell is empty, or cells are empty longAccumulate(x, null, uncontended); }}Copy the code

The longAccumulate method of Stripped64 is called from LongAdder, mainly to initialize cells, expand cells, and multiple threads hit a competing operation of a cell at the same time.

Final void longAccumulate(long x, LongBinaryOperator fn, Boolean wasUncontended) {// the value of x: add,fn: null,wasUncontended: Whether there is competition,trueIn order to compete,falseInt h; / / threadhashValue // If the thread is 0, it is the first time it has entered, so ThreadLocalRandom forces the initialization of the thread ID, and then values ithash
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); 
            h = getProbe();
            wasUncontended = true; } // Capacity expansion intention, isfalseDefinitely not expand, fortruePossible expansion Boolean Collide =false; / / death cyclefor(;;) {//as: reference to the array of cells // A :cell hit by the current thread //n: length of cells //v: value of the cell hit by the current thread cell [] as; Cell a; int n; long v; / / cells isn't emptyif((as = cells) ! = null && (n = as.length) > 0) {// The current thread hit the cell is empty, the following logic is new cellif ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if((rs = cells) ! = null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created =true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false; } // Contention occurselse if(! wasUncontended) // CAS already known to fail wasUncontended =true;      // Continue after rehash// No contention, attempt to modify the cell value corresponding to the current thread, successfully exit the loopelse if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break; // If n is greater than the maximum number of cpus, the capacity cannot be expandedelse if(n >= NCPU || cells ! = as) collide =false;            // At max size or stale
                else if(! collide) collide =true; // Get the lock and expand it to a power of 2.else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if(cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; // Move the operator one bit to the left to double the quantityfor (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue; // Retry with expanded table } h = advanceProbe(h); } //cells = null, lock is acquired, initialization begins, lock is released, and the loop continueselse if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }Copy the code

conclusion

It’s over. Flowers. AtomicInteger, AtomicLong, AtomicBoolean, AtomicReference AtomicStampedReference), and after 1.8 LongAdder advantage, source code analysis. The process also interspersed with some CAS, ABA problem introduction and solution.



If you feel that writing is also good, trouble to give a praise 👍, your recognition is the power of my writing!

If you think something is wrong, please feel free to comment.

All right, bye.

And a focus on



The resources

Java advanced multithreading (xvii) — J.U.C atomic framework: LongAdder

The CAS theory

Java 8 Performance Improvements: LongAdder vs AtomicLong

AtomicInteger goes deep

Atomic operation class AtomicInteger explained in detail

Play with Java concurrency tools, proficient in JUC, become a concurrency generalist