A few days ago, brush moments of friends, see a paragraph: if NOW I am XX, so I now no matter how hard, also XX, because I now XX is decided by the past, now hard, is to let the future of their own no longer XX. Rough reason is not rough, if the delusion now try hard, immediately no longer XX, it is impossible, need to accumulate, need precipitation, can slowly no longer XX.
All right, finish your chicken soup.
Today our content is CAS and atomic operation class application and source analysis, will also use CAS to complete a singleton pattern, but also involves pseudo-sharing. Because CAS is the cornerstone of the concurrency framework, it is quite important, and this blog post is a long one, so be prepared.
Speaking of CAS, we have to mention two technical terms: pessimistic lock and optimistic lock. Let’s look at what a pessimistic lock is and what an optimistic lock is.
Pessimistic lock, optimistic lock
The first time you see the pessimistic lock, the optimistic lock, is when you’re dealing with an interview, reading an interview question. Here is an example of how to prevent multiple threads from modifying the same record in a database.
Pessimistic locking
If you are using a mysql database, use the for update keyword + transactions. The effect is that when thread A reaches for update, it locks the specified record, and then thread B comes and waits. When thread A finishes modifying the data, it commits the transaction, the lock is released, and thread B can finally resume its work. Pessimistic locks tend to be mutually exclusive: only I can come in, and everyone else waits. Doing so has a significant impact on performance.
Optimistic locking
Add a version number field in the data table: version. This field does not need manual maintenance by programmers, but is actively maintained by the database. Every time data is modified, version will be changed.
When version is now 1:
- Thread A comes in and reads version is 1.
- Thread B comes in and reads version is 1.
- Update stu set name=’ codeBear ‘where id=1 and version=1 Success. The database voluntarily changed version to 2.
- Update stu set name=’hello’ where id=1 and version=1 Failure. Because the version field is no longer 1.
Optimistic lock is not actually a lock, it has no concept of lock.
In Java, there is also the concept of pessimistic lock, optimistic lock, the typical representative of pessimistic lock is Synchronized, and the typical representative of optimistic lock is today to talk about CAS. Before we talk about CAS, let’s talk about the atomic manipulation class, because CAS is the cornerstone of atomic manipulation class, we should first see the power of atomic manipulation class, so as to generate interest in CAS.
Application of atomic operation classes
Let’s first look at the use of the atomic operation class. Many atomic operation classes are provided in Java, such as AtomicInteger, which has a self-increment method.
public class Main {
public static void main(String[] args) {
Thread[] threads = new Thread[20];
AtomicInteger atomicInteger = new AtomicInteger();
for (int i = 0; i < 20; i++) {
threads[i] = new Thread(() -> {
for(int j = 0; j < 1000; j++) { atomicInteger.incrementAndGet(); }}); threads[i].start(); } join(threads); System.out.println("x=" + atomicInteger.get());
}
private static void join(Thread[] threads) {
for(int i = 0; i < 20; i++) { try { threads[i].join(); } catch (InterruptedException e) { e.printStackTrace(); }}}}Copy the code
Running results:
This is where the magic of the atomic manipulation class comes in. In high-concurrency situations, this approach has an advantage over Synchronized, since the Synchronized keyword serializes code and loses its multithreading advantage.
Let’s look at another example:
If there is a requirement for a field with an initial value of 0, open three threads:
- One thread executes: when x=0, x changes to 100
- One thread execution: when x=100, x is changed to 50
- One thread execution: when x=50, x is changed to 60
public static void main(String[] args) {
AtomicInteger atomicInteger=new AtomicInteger();
new Thread(() -> {
if(! AtomicInteger.com pareAndSet (0100)) {System. Out. Println ("0-100: failure"); } }).start(); new Thread(() -> { try { Thread.sleep(500); //// Note that the purpose is to let the third thread perform the judgment operation first, so that the third thread fails to modify} catch (InterruptedException e) {e.printstackTrace (); }if(! AtomicInteger.com pareAndSet (100, 50)) {System. Out. Println ("100-50: failure");
}
}).start();
new Thread(() -> {
if(! AtomicInteger.com pareAndSet (50, 60)) {System. Out. Println ("50 to 60: failure"); } }).start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }}Copy the code
The result is the same:
This example seems boring, even a little boring, because in this case, the method I call compareAndSet starts with CAS and passes two parameters that are required to be passed in the native CAS operation closer to the native CAS operation.
Since atomic manipulation classes are so awesome, it’s worth exploring the cornerstone of atomic manipulation classes: CAS.
CAS
Compare And Set, the native CAS operation requires three values:
- The field to update
- The expected value
- The new value
The field (variable) to be updated is sometimes split into two parameters: 1. Instance 2. Offset address.
Perhaps you see here, will feel the clouds in the fog, do not know what I am saying, it does not matter, continue to brave to see.
Let’s take a look at the source code of compareAndSet.
CompareAndSet source code analysis
First, calling this method requires passing two arguments, one expected value and one new value, which is the old value (a value, not a field) and the new value is the value we want to change (a value, not a field). Let’s look at the internal implementation of this method:
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
Copy the code
Unbroadening calls the compareAndSwapInt method, passing the instance and offset address in addition to the two parameters we passed to the unsafe method. This represents the instance of the current class, the AtomicInteger class. What is this offset address? In simple terms, it is to determine where in the instance we need to modify the field. Now that we know the instance, and we know where the field we need to change is in the instance, we can determine the field. However, this deterministic process is not done in Java, but at a much lower level.
The offset address is obtained in the static code block of this class:
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); }}Copy the code
Unsafe. objectFieldOffset receives the offset address of the Field Field, which is the offset address of the AtomicInteger Field.
Let’s look at the definition of the value field:
private volatile int value;
Copy the code
Volatile is used to ensure visibility of memory.
I’m sure you want to find out what compareAndSwapInt and objectFieldOffset do. Unfortunately, my personal skills are limited, so I don’t have the ability to find out. What I know is that this method is JNI, which will call C or C++, and eventually send the corresponding instructions to the CPU. This is guaranteed to be atomic.
We can look at the definitions of these two methods:
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public native long objectFieldOffset(Field var1);
Copy the code
These two methods are tagged by Native.
Let’s visualize the compareAndSwapInt method:
When we execute the compareAndSwapInt method, passing in 10 and 100, Java communicates with the lower level: I have given you the instance and offset address of the field, please help me to check whether the value of this field is 10, if 10, you can change it to 100, and return true, if not, do not change it, return false.
The process of comparing is compare, and the process of changing the value is swap, because it is to replace the old value with the new value, so we call this operation CAS.
Let’s take a look at the source code for incrementAndGet.
IncrementAndGet source code analysis
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
Copy the code
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(! this.compareAndSwapInt(var1, var2, var5, var5 + var4));return var5;
}
Copy the code
The incrementAndGet method is called to use the getAndAddInt method with three parameters:
- Var1: instance.
- Var2: indicates the offset address.
- Var4: the value to be incremented, in this case 1.
GetAndAddInt (); getAndAddInt (); getAndAddInt (); getAndAddInt (); Hey, I’ll give you the instance and offset address, and you can help me check if the value is A. If so, change it to A+1 for me and return true. If not, return false.
Here’s a question to consider: Why do we need a while loop?
For example, when two threads execute getIntVolatile, they both get 10. Thread A executes native, and the modification succeeds, but thread B fails. Because CAS is atomic, thread B has to loop again, and this time it gets 11. Go to execute native method again, modify successfully.
A while loop like this has a fancy name: CAS spin.
Let’s imagine what would happen if concurrency were really, really high today. A lot of threads are doing CAS spin, which is a waste of CPU. So after Java8, there was some refinement of the atomic operation class, which we’ll talk about later.
Probably everyone atomic operation is the underlying implementation of a class, or is lost, still don’t know what on earth is unsafe the following method, after all, just simply read the code, as the saying goes, “the paper will sleep shallow, never know the matter want to practice”, so we need to call up the unsafe the following method, to deepen our understanding.
Unsafe
Even though Unsafe, Java officials don’t recommend manipulating the Unsafe class directly, but we’re just working on a demo, as long as it’s not being released to a production environment.
Looking Unsafe, there are quite a few methods to look at, and we’ll eventually use them to make a demo.
ObjectFieldOffset: Receives a Field and returns the offset address. CompareAndSwapInt: A comparison exchange that takes four parameters: instance, offset address, expected value, and new value. GetIntVolatile: Gets the value, supports Volatile, and takes two parameters: the instance and the offset address.
These three methods in the above source analysis, has appeared, has also been a certain explanation, here to explain, is to deepen the impression, WHEN I learn CAS, also repeatedly look at the blog, look at the source code, suddenly suddenly realized. We need to use these three methods to complete a demo: write an atomic operation increment method, the increment value can be customized, yes, this method has been analyzed above. Here’s the code:
public class MyAtomicInteger { private volatile int value; private static long offset; // address private static Unsafe; static { try { Field theUnsafeField = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafeField.setAccessible(true);
unsafe = (Unsafe) theUnsafeField.get(null);
Field field = MyAtomicInteger.class.getDeclaredField("value"); offset = unsafe.objectFieldOffset(field); } catch (Exception e) {e.printStackTrace(); } } public void increment(int num) { int tempValue;do{ tempValue = unsafe.getIntVolatile(this, offset); // get the value}while(! unsafe.compareAndSwapInt(this, offset, tempValue, value + num)); //CAS spin} public intget() {
returnvalue; }}Copy the code
public class Main {
public static void main(String[] args) {
Thread[] threads = new Thread[20];
MyAtomicInteger atomicInteger = new MyAtomicInteger();
for (int i = 0; i < 20; i++) {
threads[i] = new Thread(() -> {
for(int j = 0; j < 1000; j++) { atomicInteger.increment(1); }}); threads[i].start(); }for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("x="+ atomicInteger.get()); }}Copy the code
Running results:
You may be wondering why reflection is needed to retrieve theUnsafe, but the JDK is protecting us from getting it, and if we get it the same way the JDK does, we’ll get an error:
@CallerSensitive
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
if(! VM.isSystemDomainLoader(var0.getClassLoader())) { throw new SecurityException("Unsafe"); // If we also get theUnsafe with getUnsafe, an exception will be thrown}else {
returntheUnsafe; }}Copy the code
CAS and singleton mode
You can also use CAS to complete the singleton pattern, although in normal development, no one will use CAS to complete the singleton pattern, but it is a good topic to check whether you can learn CAS.
public class Singleton {
private Singleton() {
}
private static AtomicReference<Singleton> singletonAtomicReference = new AtomicReference<>();
public static Singleton getInstance() {
while (true) { Singleton singleton = singletonAtomicReference.get(); / / get the singletonif(singleton ! = null) {// If singleton is not null, return singletonreturnsingleton; } // If singleton is empty, create a singleton singleton = new Singleton(); / / CAS operation, expected value is NULL, the new value is a singleton / /, if successful, returns the singleton / / if it fails, to enter the second cycle, singletonAtomicReference. The get () is not emptyif (singletonAtomicReference.compareAndSet(null, singleton)) {
returnsingleton; }}}}Copy the code
Notes to write has been more clear, can be on the notes, and then a good understanding.
ABA
CompareAndSet, I’ve already written a demo, so you can try to analyze the source code as well, but I’m not going to analyze it, but the reason I’m going to mention compareAndSet again is to raise a question.
Suppose there are three steps:
- Change 150 to 50
- Change 50 to 150
- Change 150 to 90
If you look closely, what these three steps do is, one variable started at 150, changed to 50, and then changed to 150 again! And finally, if this variable is 150, change it to 90. This is the problem with ABA in CAS.
The third step, to determine whether the value is 150, has two different requirements:
- Yes, the value has been changed, but now it has been changed back, so step 3 is true.
- No, the value is 150, but the value has been changed, so step 3 is not true.
For the second requirement, we can solve this problem with an AtomicStampedReference, which supports generics and has the concept of stamp. Post the code directly below:
public static void main(String[] args) {
try {
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<Integer>(150, 0);
Thread thread1 = new Thread(() -> {
Integer oldValue = atomicStampedReference.getReference();
int stamp = atomicStampedReference.getStamp();
if (atomicStampedReference.compareAndSet(oldValue, 50, 0, stamp + 1)) {
System.out.println("150 - > 50 success."+ (stamp + 1)); }}); thread1.start(); Thread thread2 = new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) {e.printStackTrace(); } Integer oldValue = atomicStampedReference.getReference(); int stamp = atomicStampedReference.getStamp();if (atomicStampedReference.compareAndSet(oldValue, 150, stamp, stamp + 1)) {
System.out.println("50 - > 150 success."+ (stamp + 1)); }}); thread2.start(); Thread thread3 = new Thread(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) {e.printStackTrace(); } Integer oldValue = atomicStampedReference.getReference(); int stamp = atomicStampedReference.getStamp();if (atomicStampedReference.compareAndSet(oldValue, 90, 0, stamp + 1)) {
System.out.println("150 - > 90 success."+ (stamp + 1)); }}); thread3.start(); thread1.join(); thread2.join(); thread3.join(); System.out.println("The current value is." + atomicStampedReference.getReference() + "; Stamp is"+ atomicStampedReference.getStamp()); } catch (Exception e) { e.printStackTrace(); }}Copy the code
Java8 optimization for atomic operation classes
In Java8, a more complete atomic manipulation class, LongAdder, was introduced. In addition, the LongAdder atomic manipulation class was also introduced.
It maintains an array of Cell[] and base, and a value inside the Cell. In case of contention, the JDK will select a Cell according to the algorithm and operate on the value in the Cell. If there is still a contention, it will try another Cell again. Finally, add the value and base in the Cell[] to get the final result.
Because the code is more complex, I will choose a few more important issues, with the problem to look at the source:
- When Cell[] is initialized.
- If there’s no competition, you’re only going to operate on base, so that’s how you see it.
- What are the rules for initializing Cell[]?
- Cell[] What is the expansion time?
- How does the initialization Cell[] and expansion Cell[] ensure thread safety?
Here is a UML diagram of the LongAdder class:
The add method:
public void add(long x) {
Cell[] cs; long b, v; int m; Cell c;
if((cs = cells) ! = null || ! CasBase (b = base, b + x)) {// The first line Boolean uncontended =true;
if(cs = = null | | (m = cs. Length - 1) < 0 | | / / the second line (c = cs [getProbe () & m]) = = null | | / / third line! (uncontended = c.value (v = c.value, v + x)))// accumulate (x, null, uncontended); // Line 5}}Copy the code
The first line: | | judgment, the former is to determine whether a cs = cells is not null, 】 the latter is to judge whether the CAS “unsuccessful”. What did casBase do?
final boolean casBase(long cmp, long val) {
return BASE.compareAndSet(this, cmp, val);
}
Copy the code
Call the compareAndSet method and check whether it succeeded:
- Returns true if there is currently no contest.
- A thread returns false if it is currently competing.
Go back to the first line and explain the judgment as a whole: if cell[] has already been initialized, or if there is a race, the second line of code is entered. If there is no race and no initialization, you don’t get to the second line of code.
This answers the second question: if there is no competition, only base will be operated on, from here.
The second line of code: | | judgment, the former judge whether cs “NULL”, which determine whether the length of the (cs – 1) [greater than 0]. These two judgments should be used to determine whether Cell[] is initialized. If it is not initialized, line 5 is entered.
If the cell is initialized, the getProbe() &m algorithm will get a number to determine whether cs[number] is NULL and assign cs[number] to C. If it is NULL, the fifth line of code will be entered. We need to take a quick look at what getProbe() does:
static final int getProbe() {
return (int) THREAD_PROBE.get(Thread.currentThread());
}
private static final VarHandle THREAD_PROBE;
Copy the code
We just need to know that the algorithm is based on THREAD_PROBE.
If there is no contention, it will succeed. If there is a contention, it will fail. There is one outside! (), so CAS fails and goes to line 5. Note that you are already operating on the Cell element.
Line 5: This method is very complicated internally, so let’s look at the method as a whole:
There are three ifs: 1. Determine if cells are initialized, and if so, enter the if.
There are six more ifs in there, which is terrible, but we don’t have to pay attention to all of them here, because our goal is to solve the problem that we have mentioned above.
Let’s take a look at the big picture:
First judgment: according to the algorithm, take an element of cs[] and assign it to C, then check whether [NULL], if [NULL], enter the if.
if(cellsBusy ==0) {// If cellsBusy==0, it means "busy" now, enter thisifCell r = new Cell(x); // Create a Cellif(cellsBusy ==0 && casCellsBusy()) {// Check cellsBusy ==0 again and lock so that only one thread can enter thisif// Add the created Cell element to the Cell[] try {Cell[] rs; int m, j;if((rs = cells) ! = null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r;break done; } } finally { cellsBusy = 0; // means "not busy"}continue; // Slot is now non-empty
}
}
collide = false;
Copy the code
This is a complement to the first problem. When Cell[] is initialized, one of the elements is NULL. This element is initialized only when it is used.
Sixth judgment: check whether cellsBusy is 0 and lock, if successful, enter if to expand Cell[].
try {
if (cells == cs) // Expand table unless stale
cells = Arrays.copyOf(cs, n << 1);
} finally {
cellsBusy = 0;
}
collide = false;
continue;
Copy the code
This answers half of the fifth question: when Cell[] is expanded, CAS is used to lock, so thread security is guaranteed.
What about the fourth question? The first thing you should notice is that the outside is a for (;;). An infinite loop is terminated only when it breaks.
If [] is set to collide with the number of cores in the CPU, if [] is set to collide with the number of cores in the CPU, if [] is set to collide with the number of cores in the CPU, if [] is set to collide with the number of cores in the CPU, if [] is set to collide with the number of cores in the CPU, if [] is set to collide with the number of cores in the CPU. So if I put put put on false, I’m going to put on if, put put on true, say there’s a collision, and then I’m going to go to advanceProbe, generate a new THREAD_PROBE, and I’m going to loop again. If the CAS fails on the third if, put the Cell[] in a place where it is larger than the number of cores and collide with the number of cores, put it in a place where the CAS is set to collide with the number of cores and collide with the number of cores. Isn’t it complicated?
To put it simply, the capacity expansion time of Cell[] is as follows: The length of Cell[] is smaller than the number of CPU cores, and the Cell CAS has failed twice.
2. The first two statements are easy to understand, focusing on the third one:
final boolean casCellsBusy() {
return CELLSBUSY.compareAndSet(this, 0, 1);
}
Copy the code
Cas sets CELLSBUSY to 1, which can be interpreted as adding a lock, since initialization is imminent.
try { // Initialize table
if (cells == cs) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
break done;
}
} finally {
cellsBusy = 0;
}
Copy the code
Cell[] is 2 in length, but there is still a NULL element in it. Now only one of the elements is initialized. Finally, cellsBusy is changed to 0, indicating that now “busy”.
This answers the first question: Cell[] is initialized when a race occurs and Cell[] has not yet been initialized. Fourth problem: The initialization rule is to create an array of length 2, but only one element is initialized and the other element is NULL. Half of the fifth problem: CAS is used to lock the Cell[] when it is initialized, so it is thread safe.
3. If all the preceding parameters fail, perform CAS operations on base.
If you follow me through the source code, you’ll find a comment you’ve probably never seen before:
What is this note for? Contended is used to resolve bogus shares.
Okay, there’s another knowledge blind spot, what is fake sharing.
False sharing
We know the relationship between CPU and memory: when the CPU needs a piece of data, it will first look for it in the cache; if it does not find it in the cache, it will look for it in the memory; if it finds it, it will copy the data to the cache and fetch it directly from the cache next time.
But that’s not a perfect statement. The data in the cache is stored as cached rows. What does that mean? A cache row may have more than one data. If the size of a cache row is 64 bytes and the CPU fetches data from memory, it fetches all the adjacent 64 bytes and copies them to the cache.
This is an optimization for single threads. Imagine that if the CPU needs A data, it pulls the adjacent BCDE data out of memory and puts it in the cache. If the CPU needs more BCDE data, it can fetch it directly from the cache.
This is a disadvantage in multi-threading, because the same cache row can only be read by one thread at a time, which is called pseudo-sharing.
Is there a way to solve this problem? The clever developer came up with a solution: if the cache row size is 64 bytes, I can add some redundant fields to fill it up to 64 bytes.
For example, I only need one long field, and now I add six more long fields for padding, one long is 8 bytes, now 7 long fields are 56 bytes, and the object header is 8 bytes, which is 64 bytes, just enough for one cache line.
But this way is not enough grace, so in the coming Java8 launched @ JDK. Internal. Vm. The annotation. Contended annotations, to solve the problem of false sharing. However, if developers want to use this annotation, they need to add JVM parameters, which I won’t mention here because I haven’t tested them personally.
This chapter is quite lengthy and covers most of the common problems in CAS.
Concurrency frameworks are very difficult to learn because they are rarely used in development, but concurrency is a very effective way to improve performance and throughput, so concurrency is worth learning and studying.