## Volatile does not guarantee atomicity. ## Volatile does not guarantee atomicity. ## Volatile does not guarantee atomicity

Coroutine shared variable security problems simple introduction

If we add thread.currentThread (), we can see that some coroutines have multiple threads behind them. As long as there are multiple threads, there will be the problem of multiple threads competing for shared variables

@Test
fun test01(a) = runBlocking<Unit> {
   launch {
      / / Thread/main @ coroutine# 2, 5, the main
      println("${Thread.currentThread()}Launch1 executing 2")
   }
   launch {
      / / Thread/main @ coroutine# 3, 5, the main
      println("${Thread.currentThread()}Launch2 executing 2")
   }
   withContext(Dispatchers.IO) {
      / / Thread [DefaultDispatcher - @ coroutine# worker - 1 of 1, 5, the main]
      println("${Thread.currentThread()}WithContext is doing 3")}}Copy the code

You can see that the code above uses two threads Thread[main] and Thread[defaultDispatcher-worker-1].

The coroutine uses three @coroutine#1, @coroutine#2, and @coroutine#3 but the coroutine @coroutine#3 is in a different thread

We are now doing 10000 i++ between @coroutine#1 and @coroutine#2 to determine if it is thread safe

I++ is then executed 10000 times each between @coroutine#2 and @coroutine#3

Coroutines between:

@Test
fun test02(a) = runBlocking<Unit> {
   var i = 0
   val list = mutableListOf<Job>()
   repeat(10000) {
      list.add(launch {
         i++
      })
      list.add(launch {
         i++
      })
   }
   list.forEach {
      it.join()
   }
   println(i) / / 20000
}
Copy the code

Between threads:

@Test
fun test03(a) = runBlocking<Unit> {
   var i = 0
   val list = mutableListOf<Job>()
   repeat(10000) {
      list.add(launch {
         i++
      })
      list.add(launch(Dispatchers.IO) {
         i++
      })
   }
   list.forEach {
      it.join()
   }
   println(i) / / 19668
}
Copy the code

Can see there is a thread safety, threads and coroutines security problem is more unpredictable, use multiple threads, as we all know, it must be thread safe, but using coroutines, unable to judge whether the same thread, this time will need to take the initiative to print out exactly which thread needs to be locked

At that point, we can go to the coroutine library and see if there’s a lock for that

I found a lock. Use it

@Test
fun test04(a) = runBlocking<Unit> {
   val mutex = Mutex()
   var i = 0
   val list = mutableListOf<Job>()
   repeat(10000) {
      list.add(launch {
         mutex.withLock {
            i++
         }
      })
      list.add(launch(Dispatchers.IO) {
         try {
            mutex.lock()
            i++
         }
         finally {
            mutex.unlock()
         }
      })
   }
   list.forEach {
      it.join()
   }
   println(i)
}
Copy the code

Notice the two uses of mutex in the code above

All right so that’s the end of the simple introduction

The volatile keyword

The role of volatile in Java multithreading is

  1. Prevent code reordering
  2. flushcpustore buffer(write) andInvalidate queue(read) Ensure that variables are visible across multiple threads

Java’s underlying implementation of volatile leverages the CPU’s memeny barrier

Store Buffer and Invalidate queue

Before we get to the store Buffer and invalidate queue, we need to know something else…

CPU cache

The CPU is too fast. The CPU takes one CPU line tick as the unit of time, and the main memory needs hundreds of CPU ticks for one operation

So the CPU has to use caching to improve the overall execution efficiency

The clock cycle shown below is the assumed speed ratio

You can see that the closer you get to the CORE of the CPU, the faster the cache, and finally to the register

After the occurrence of cache, CPU can cache frequently used variables in the cache, and store the common data between the core and the core in L3. If the cache is not hit, it needs to lock the bus, go to the main memory to read the corresponding variables, and store them in the cache

Now with caches, the LIMITATION of CPU is no longer main memory, but a new problem arises. How do caches between cores resolve inconsistencies

Multi-core cache consistency scheme: MESI

Problems with multiple cores

There is a variable a shared by core A and core B, and the two cores modify the value of variable A at the same time. Which core value should the variable A choose? In addition, if the value of variable A has been modified by core B, core A does not know whether the value of variable A has been modified or not, resulting in the thread removing the core A read the old value, resulting in the value of the same variable A is inconsistent in the entire CPU cache

But we need some preliminary knowledge before we can make a proposal

Cache line

The CPU does not operate the cache byte by byte, because it is slow and accesses the cache more often, which is very inefficient. Therefore, they define the concept of cache line, which makes the core [1] operate line by line, and each line is usually 64 bytes in size (also 32 bytes, 128 bytes, etc.).

[1]: In fact, the CPU today does not necessarily operate line by line, but may operate multiple lines at once

Although cache rows are proposed as the unit of cache, new problems arise

Cache row pseudo-sharing

We found that a Java long was 8 bytes in size, and one of the things that happens when you store a few more variables

Variables A, B, C, and D are stored on the same cache row. If the CPU receives an invalidate message from variable A and marks a variable as invalid, but no, the CPU’s smallest unit of cache operation is a cache row, it will mark all of those rows as invalid and cause a problem. B, C, and D are all victims

So in general, we can add a placeholder variable after variable A, put variable A on a single line, to improve efficiency

Storing shared variables involving multiple threads on a single line can be more efficient, if not unnecessary

Java 8 provides annotations to realize @ sun. Misc. Contended the above functions, but after java11 this annotation is placed on another bag @ JDK. Internal. Vm. The annotation. Contended, To use it, add the -xx: -RestrictContEnded parameter

So that’s the prep

What is MESI?

MESI(Modified-exclusive-shared-invalid) is proposed to solve the cache inconsistency between multiple cores. MESI is similar to a read/write lock, which is Exclusive when written and Shared when read. The operation unit of MESI is the cache line

MESI an explanation of each word

M Modified: The program modifies variable A in the cache of core A and marks variable A in the cache as M, indicating that the value is only newly Modified by the core A, while other cores do not know that the value has been Modified or that the cached variable has become invalid. At this time, the cached data is different from the memory

E Exclusive: After A variable is modified, core A sends an invalidate message to other cores, and the other cores send an Invalid ACK to core A. Core A sets the variable to E Exclusive mode. In this case, the data is consistent with the memory and only exists in the cache

S share: when core B tries to read variable A, it finds that variable A is in invalid state. Remote read the variable in core A’s cache, and the cached variable is consistent with the memory

I invalid: When the core removes elements from the Invalidate queue, it marks some cached rows as invalid

The transformation between mesis can be seen in the following figure

Core initiated tag messages are passed to other cores via the message bus, and the general message types can be classified as follows:

  • Read: Read request message from physical memory address with data
  • Read Response:ReadThe response information of a request contains the data to which the read request is directed
  • Invalidate: This message contains the physical memory address of the data, which is meant to invalidate the corresponding cache row if it is held by another CPU
  • Invalidate Acknowledge:CPUInvalidate the messageThe purpose of the response is to inform the originatorInvalidate the messagetheCPUThis cache row is now invalidated
  • Read Invalidate: The message is actuallyReadInvalidateThe corresponding response is naturally oneRead ResponseAnd a series ofInvalidate Acknowledge
  • WritebackThe message contains a physical memory address and data content, which is intended to be written back to memory across the bus

New problems

Core A changes the value of variable A, A = 2. At this time, core A’s cache line VARIABLE A is modified, and core A will send invalid message to variable A in other core caches through the message bus. The variable has become invalid in other core caches, and other cores need to reply invalid ACK. After answering the core A began other operations, have you found that there are new problems in the middle????

Does core A send an invalid message and wait uniformly? !!!!!!!!! The next instruction will not be executed until an Invalid ACK message from another core is received.

So the Store Buffer was born, adding a universal middle layer to solve the problem

storebuffer

With StoreBuffer, core A no longer has to wait for changes to store Buffer and send invalid messages to other cores. Instead of waiting for ack, core A can do other things. Core A reads the data in the Store Buffer and moves it to the cache line, so that A synchronous wait event becomes an asynchronous event

Synchronous waiting becomes asynchronous

New problems

The introduction of the Store Buffer did increase core utilization, but there was a problem

After core A’s modification of variable A is thrown into the Store Buffer, it reads the value of variable A again before receiving an ACK and finds that variable A is still the old value

a = 1
funA {
    a = 2
}

funB {
   if a == 2 {
       // xxxxxx
   }
}
Copy the code

Core A performs funA to change variable A to 2, and then immediately executes funB to determine if A == 2 is false, which is obviously not true

Notice that this is a mononuclear case, and that’s what happens when you have a mononuclear case, it bursts

Store Forwarding: Reads from the Store Buffer first

In order to solve this problem, engineers introduced a new concept called Store Forwarding, which simply reads the Store buffer first and then the cache

Now that the single core problem has been solved, the multiple cores have exploded again

a = b = 0
funA () {
    a = 1
    b = 1
}

funB() {
    while (b == 0) continue;
    assert(a == 1)
}
Copy the code

Now we have a scenario where A is co-owned by core A and B, and B is only owned by Core A, and core A performs funA, and core B performs funB

  1. First of all,a = 1 , The core of ADrop the modification tostore bufferAnd sendInvalid message

2. If b = 1, select * from caches where b = 1. Remote read operation is issued to find B = 1 in cache, execute while judgment, not enough to break out of loop 4. Core B asserts a == 1, but raises an exception because core A has not received an invalid ACK message, so it defaults to a == 0

The solution is to add a memory barrier

The memory barrier

Memory barrier is a kind of synchronous barrier instructions. The code before and after the memory barrier will not be reordered, but executed in strict accordance with a certain order. That is to say, the instructions before and after the memory barrier will not be out of order due to system optimization and other reasons

We’ll just change the code to something like this:

FunB () {a = b = 0 funA () {a = 1 smp_wMB () {b = 1} funB() {while (b = 0) continue; assert(a == 1) }Copy the code

The store buffer is flushed, and all variables in it are written to the cache. The store buffer is flushed, and all variables are written to the cache. When another core reads the variable, it can be remote read directly from the cache

Note that Store Forwarding deals with single-core code reordering, not multi-core code

But… Do you have any questions

invalidate queues

New problems:Store Buffer is not enough.

Now a new problem is that the store buffer is not large enough, and a lot of misses in the cache cause the core to write variables into the store buffer constantly, and the store buffer is in an emergency, and the core is empty, etc. Wait until the Store Buffer is empty before proceeding with the rest of the logic. The solution is simply to shorten the amount of time variables stay in the Store Buffer

Let’s reanalyze the previous logic and see which step makes the variable stay in the Store buffer longer

The core writes an invalid message to the store buffer. The core waits for ack and writes the store buffer to the cache. This is the same as Thread. Start a Thread.

The problem we have now is that the store buffer is not enough. Obviously, in the previous logic, waiting for ack directly affects how long the variable stays in the Store buffer

The current solution is to add Invalidate Queues, which store invalid messages from other cores. Isn’t that still unresolved?

Again, from the core point of view of receiving an INVALID message, if I receive an invalid message and need to find a cache line in the cache, mark it as invalid, and then send an ACK message

Aye? It’s synchronized again, isn’t it? As soon as it receives an invalid message, it has to mark the cache line and issue an ACK. If it receives an invalid message, it will mark the cache line and issue an ACK.

So why don’t I just receive the invalid message, store it in the Invalidate queue, issue an ACK, and wait until I want to process the Invalidate queue and then read it one by one, find the variable “invalid” in the cache, A win-win situation?

This feature reduces the time it takes to find a variable in the cache line and mark it, and replaces it with queue.add(message)

Don’t get too excited. A new problem has arisen

Another problem

Now we have a new problem with the invalidate queue, which causes invalid variables to be marked later in the cache

I read you, what to do??

Take a look at the code below

FunB () {a = b = 0 funA() {a = 1 smp_wMB () {b = 1} funB() {while (b = 0) continue; assert(a == 1) }Copy the code

Once again, the a variable has two cores (a and B cores) in common, and the B variable has only core A

  1. The core of AperformfunA.a = 1depositstore bufferaInvalid messageTo other cores
  2. Core BreceivedInvalid message, save the messageinvalidate queueAnd immediately send it outAck message
  3. The core of AencounterWrite memory barrierVariable aWrite to the cache

4. Core A executes b=1 as an exclusive variable, so it can be directly written to the cache 5. Core B finds that B == 0 ==> false and breaks out of the while loop 6. Core B determines the state of variable A, but since invalid messages are queued, the core assumes that a = 0 is correct

So what’s the solution? Do I have to check the Invalidate queue for invalidates before reading a variable from the Store Buffer?

In fact, the engineers didn’t choose to do this because the Invalidate queue is a queue that needs to be traversed one by one, which is slow, or because the Invalidate queue can be very long. Like store forwaring, how to solve problems between multiple cores?

Kotlin coroutine here is not in-depth, further in-depth kotlin coroutine will not learn?? I went crazy. I went off the rails

The solution is to add a read memory barrier

FunB () {a = b = 0 funA() {a = 1 smp_wMB () {b = 1} funB() {while (b = 0) continue; smp_rmb(); assert(a == 1) }Copy the code

In addition to the read memory barrier, this function can process the Invalid queue before reading the following variables and then actually read variable A. In this case, variable A is no longer in the shared state of S, but in the invalid state of I. Remote Read is required to read variable A

I can’t analyze the kernel directly (lazy), but I can analyze the source code for volatile

JVM Underlying AnalysisvolatileSource code (mainly analysis of x86)

talk is cheap, show me the code

You’ll see that isVolatile is called if the type is int

obj->release_int_field_put(field_offset, STACK_INT(-1));

inline void oopDesc::release_int_field_put(int offset, jint contents) 
{
    OrderAccess::release_store(int_field_addr(offset), contents);
}
Copy the code
inline void OrderAccess::release_store(volatile jint* p, jint v) { 
    release(a); *p = v; }Copy the code

storestore –> release

inline void OrderAccess::release(a) {
  WRITE_MEM_BARRIER;
}
Copy the code
#define WRITE_MEM_BARRIER __asm __volatile ("": : :"memory")
Copy the code

GCC GCC GCC GCC GCC GCC GCC GCC GCC GCC GCC GCC GCC GCC GCC GCC GCC GCC GCC GCC GCC

This is the memory barrier for writing, which is called after assignment if volatile:

storeload –> fence

inline void OrderAccess::storeload(a) {
    fence(a); }Copy the code
inline void OrderAccess::fence(a) {
#ifdef AMD64
  StubRoutines_fence(a);#else
  // Determine if there are multiple cores
  if (os::is_MP()) {
    __asm {
      lock add dword ptr [esp], 0; }}#endif // AMD64
}
Copy the code

Here we see that volatile has two memory barriers: OrderAccess:: Release and OrderAccess:: storeLoad

We also found a lot of memory barriers

Java is a memory barrier

inline void OrderAccess::loadload(a)   { acquire(a); }inline void OrderAccess::storestore(a) { release(a); }inline void OrderAccess::loadstore(a)  { acquire(a); }inline void OrderAccess::storeload(a)  { fence(a); }Copy the code

And then you see that OrderAccess:: Release is actually OrderAccess:: storeStore

So the memory barriers used before and after volatile are storeStore and storeLoad

Storestore uses volatile, which is native to THE C language. Check the C ++ volatile function here.

The volatile keyword is used to prevent optimization of code (variables/objects) that the (pseudo) compiler considers unchangeable “by the code itself.” In C, for example, the volatile keyword is used to alert the compiler that a variable defined behind it may change at any time, so that a compiled program reads data directly from the address of the variable every time it needs to store or read the variable.

Don’t add a lot of functionality to VOLATILE. It really only has one function, preventing compiler optimizations.

Storeload is a system primitive supported by x86, but is expensive and executes using the Lock instruction, locking the cache or CPU bus

Acquire: loadload: loadstore: loadstore: loadstore: loadstore: loadstore: loadstore: loadstore: loadstore: loadstore: loadstore

loadload loadstore –> acquire

inline void OrderAccess::acquire(a) {
// If it is an AMD system
#ifndef AMD64
  __asm {
    mov eax, dword ptr [esp];
  }
#endif / /! AMD64
}
Copy the code

The source code for volatile is at: The bytecodeInterpreter. CPP file, and the four Java memory barriers are in orderAccess_windows_x86.inline. HPP. Here I choose the four memory barrier implementations for Windows x86

Volatile does not guarantee atomicity

Volatile does little more than ensure visibility and prevent code reordering

A lot of people think that’s wrong. Doesn’t volatile guarantee atomicity?

The first thing that comes to mind is something like this:

@Volatile
var flag = false

var a = 0

fun funA(a) {
   TimeUnit.MILLISECONDS.sleep(1555)
   /** * Write memory barrier to clear the Store Buffer so that no variables are not written to the cache and other cores can read data */
   // storestore
   flag = true
   // storeload
   a = 1
}

fun funB(a) {
   // loadload
   while(! flag) {continue
   }
   // loadstore
   /** * The memory barrier clears the invalidate queue, so that the value of a is marked as invalid * and the following code is readable
   assert(a == 1)
   log("funB running...")}@Test
fun test01(a) = runBlocking {
   val job1 = launch(Dispatchers.IO) {
      funA()
   }
   val job2 = launch(Dispatchers.Unconfined) {
      funB()
   }
   joinAll(job1, job2)
}
Copy the code

This code shows the use of Kotlin’s volatile: @volatile

Don’t you think this is atomic operation? In reality, it’s just a matter of visibility and reordering prevention

If you change flag to Flag ++, it’s different

Hey, we’ve written code like this before

@Test
fun test03(a) = runBlocking<Unit> {
   var i = 0
   val list = mutableListOf<Job>()
   repeat(10000) {
      list.add(launch {
         i++
      })
      list.add(launch(Dispatchers.IO) {
         i++
      })
   }
   list.forEach {
      it.join()
   }
   println(i) / / 19668
}
Copy the code

Try to change the

@Volatile
var i = 0

@Test
fun test01(a) = runBlocking<Unit> {
   val list = mutableListOf<Job>()
   repeat(10000) {
      list.add(launch {
         i++
      })
      list.add(launch(Dispatchers.IO) {
         i++
      })
   }
   list.forEach {
      it.join()
   }
   println(i) / / 19904
}
Copy the code

The result is 19904

Why is that? If flag = true is compiled into bytecode, there is only one sentence. If I ++ is changed to I = I + 1, the code becomes I = I + 1, and there are 3 steps:

  1. readi
  2. i + 1
  3. Assign the value toi

Three steps, obviously not thread-safe

@Volatile
var i = 0
val mutex = Mutex()

@Test
fun test01(a) = runBlocking<Unit> {
   val list = mutableListOf<Job>()
   repeat(10000) {
      list.add(launch {
         mutex.withLock {
            i++
         }
      })
      list.add(launch(Dispatchers.IO) {
         mutex.withLock {
            i++
         }
      })
   }
   list.forEach {
      it.join()
   }
   println(i) / / 19668
}
Copy the code

Of course this is not the only solution, we can also use AtomicInterger with unlocked CAS

@Volatile
var i: AtomicInteger = AtomicInteger(0)

@Test
fun test01(a) = runBlocking<Unit> {
   val list = mutableListOf<Job>()
   repeat(10000) {
      list.add(launch {
         i.getAndIncrement()
      })
      list.add(launch(Dispatchers.IO) {
         i.getAndIncrement()
      })
   }
   list.forEach {
      it.join()
   }
   println(i) / / 20000
}
Copy the code

Under CAS, we have three values, the old value, the new value and the actual value

1. Old value (also known as estimate): the value we just read 2. New value: the value we need to set to enter 3. Actual value: the value in our main memory (usually volatile variable)

If you need to set a new value, check whether the old value is the same as the actual value.

If they are the same, set the new value directly

If not, the value has been modified during this period. Read the value againThe actual valueThe value ofThe old valueAnd then fromDetermine whether the old value is equal to the actual valueStart the loop until the value is set in

There is a time difference between reading the old value and judging whether the old value is equal to the actual value. Cas uses this time difference. As long as the old value is the same as the actual value within this time difference, we can immediately set the new value to the actual value

Let’s take a look at the source code for AtomicInteger and find these three values

Here the value is set, and here the value is volatile, so it is the actual value

Now let’s find the old value

public final int getAndIncrement(a) {
    return U.getAndAddInt(this, VALUE, 1);
}
Copy the code

You can’t see it here, but go inside the getAndAddInt function

@HotSpotIntrinsicCandidate
// o: is an object
// offset: indicates the offset address of the value of the object
// Get the value of the actual value
// delta: This is the increment, the increment of the new value
public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        // Get the old value
        v = getIntVolatile(o, offset);
        // Check whether the actual value of o + offset is the same as the old value v. If so, set v + delta to the new value
    } while(! weakCompareAndSetInt(o, offset, v, v + delta));return v;
}
Copy the code

Old value v, actual value O + offset, new value v + delta

Cas JVM source code need not read? But let’s just look at it briefly

fromAtomicIntegerStart analyzing cas source code in the JVM

There is a function under the Unsafe. Java file

public final native boolean compareAndSetInt(Object o, long offset, int expected, int x);
Copy the code

OpenJDK 11, openJDK1.8, openJDK1.8, openJDK1.8

Changed the JDK version sure enough

public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
Copy the code

Then I found the source code:

unsafe.cpp

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
// Think of our Java o as a strong (OOP *) and then an oop* pointer
oop p = JNIHandles::resolve(obj);
// set p + offset to addr
jint *addr = (jint *)index_oop_from_field_offset_long(p, offset);
// That's the point
// Compare and swap, x is our new value, addr is the actual value, e is the old value (expected)
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
Copy the code

Look at this jobject obj, jLong offset, Jint e, Jint x, matched with our Java parameters

jobject obj, jlong offset, jint e, jint x

Object o, long offset, int expected, int x

Then we dig deep inside the Atomic:: CMPXCHG

We’re looking for the Window x86 file

You’ll find two CMPXCHGS with the same function signature, don’t worry one is AMD, we’re looking for Intel,

inline jint Atomic::cmpxchg(jint exchange_value, volatile jint* dest, jint compare_value) {
  int mp = os::is_MP(a); __asm { mov edx, dest mov ecx, exchange_value mov eax,compare_value
    LOCK_IF_MP(mp)
    cmpxchg dword ptr [edx], ecx
  }
}
Copy the code

CMPXCHG assembly code is used at the bottom. If it is multi-core, LOCK_IF_MP(mp) is also used.

Exchange_value specifies the value to be exchanged, dest specifies the pointer to which value, compare_value specifies the value to be compared

The rest of the assembly, see understand a bit, but CMPXCHG what characteristics do not quite understand, want to go deeper, their own Baidu

Another 6000 words, obsessive-compulsive disorder, post the article first, and sort it out later when you are free (possibly wrong), does this chapter have anything to do with Kotlin?? !!!!!!!!!