preface
When using multithreading concurrent programming, it is common to encounter modification of shared variables. Select ConcurrentHashMap and ConcurrentLinkedQueue to securely store data. However, if it is only a matter of state change and thread execution order, it is better to use Atomic components or synchronous components such as ReentrantLock and CyclicBarrier. The principles and usage of these components are described below
- Implementation principle of atomic components CAS
- AtomicBoolean, AtomicIntegerArray, etc.
- The implementation principle of synchronous components
- Use of synchronization components such as ReentrantLock and CyclicBarrier
Pay attention to the public account, communicate together, wechat search: sneak forward
Implementation principle of atomic components CAS
- The basic implementation of CAS can be seen in a previous article: detailed explanation of the principle of locking, synchronized, volatile+ CAS low-level implementation
Application scenarios
- Can be used to achieve variables, states in multithreading atomic operation
- Can be used to implement reentrant locks
Atomic components
- Atomic operations on atomic components are implemented by using CAS to spin volatile variables
- The type variable for volatile ensures that when the variable is changed, other threads see the latest value
- Cas guarantees that the value modification operation is atomic and will not be interrupted
Base type atomic class
AtomicBoolean// Boolean typeAtomicInteger// Positive integer type
AtomicLong // Long integer type
Copy the code
- Use the sample
public static void main(String[] args) throws Exception {
AtomicBoolean atomicBoolean = new AtomicBoolean(false);
// Asynchronous threads modify atomicBoolean
CompletableFuture<Void> future = CompletableFuture.runAsync(() ->{
try {
Thread.sleep(1000); // Make sure asynchronous threads are following the main thread and change atomicBoolean to false
atomicBoolean.set(false);
}catch (Exception e){
throw newRuntimeException(e); }}); atomicBoolean.set(true);
future.join();
System.out.println("boolean value is:"+atomicBoolean.get()); } --------------- The command output is ------------------boolean value is:false
Copy the code
Reference class atomic class
AtomicReference
// Timestamped version of the reference class atom class
AtomicStampedReference
// equivalent to AtomicStampedReference, AtomicMarkableReference is concerned with
// It doesn't matter if the variable is the same as the original variable
AtomicMarkableReference
Copy the code
- AtomicReference’s source code is as follows, and it internally defines one
volatile V value
, and implements atomic operations using VarHandle(FieldInstanceReadWrite). MethodHandles calculates the value offset in the class, and then calls Unsafe from VarHandle.public final native boolean compareAndSetReference(Object o, long offset, Object expected, Object x)
Method atoms modify properties of objects
public class AtomicReference<V> implements java.io.Serializable {
private static final long serialVersionUID = -1848883965231344442L;
private static final VarHandle VALUE;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
VALUE = l.findVarHandle(AtomicReference.class, "value", Object.class);
} catch (ReflectiveOperationException e) {
throw newExceptionInInitializerError(e); }}private volatileV value; .Copy the code
ABA problem
- Thread X prepares to change the value of the variable from A to B, while thread Y changes the value of the variable from A to C, and then to A again; Finally thread X detects that the variable value is A and replaces it with B. But in fact, A is no longer the same as it was
- The solution is to make the variable unique. The value can be a version number, or a timestamp. If the version number is added, the modification of thread Y becomes A1->B2->A3, then the update of thread X can determine that A1 is not equal to A3
- The implementation of AtomicStampedReference is similar to that of AtomicReference, except that the atom changes the variable
volatile Pair<V> pair;
, Pair is an inner class. AtomicStampedReference can be used to solve ABA problems
public class AtomicStampedReference<V> {
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return newPair<T>(reference, stamp); }}private volatile Pair<V> pair;
Copy the code
- If we do not care whether the variable has been modified in the middle, but only whether the current variable is the original variable, we can use AtomicMarkableReference
- An example of AtomicStampedReference
public class Main {
public static void main(String[] args) throws Exception {
Test old = new Test("hello"), newTest = new Test("world");
AtomicStampedReference<Test> reference = new AtomicStampedReference<>(old, 1);
reference.compareAndSet(old, newTest,1.2);
System.out.println("Object:"+reference.getReference().name+"; Version number:"+reference.getStamp()); }}class Test{
Test(String name){ this.name = name; }
publicString name; } --------------- ------------------ Object: world; The version number:2
Copy the code
Array atom class
AtomicIntegerArray// An integer arrayAtomicLongArray// Long integer array
AtomicReferenceArray // Array of reference types
Copy the code
- Array atomic classes inside will initial a final array, it is the whole array as an object, and then according to subscript elements offset calculation method of the index, then call UNSAFE.com pareAndSetReference atomic operations. Arrays are not volatile. To ensure that element types are visible in different threads, retrieving elements uses UNSAFE
public native Object getReferenceVolatile(Object o, long offset)
Method to get real-time element values - Use the sample
// Elements are initialized to 0 by default
AtomicIntegerArray array = new AtomicIntegerArray(2);
// The element with subscript 0 expects 0 and updates 1
array.compareAndSet(0.0.1);
System.out.println(array.get(0)); --------------- The command output is ------------------1
Copy the code
The attribute atom updates the class
AtomicIntegerFieldUpdater AtomicLongFieldUpdater AtomicReferenceFieldUpdaterCopy the code
- If the operation object is a particular type of attribute, you can use AtomicIntegerFieldUpdater atomic updates, but the attributes of a class needs to be defined into volatile modified variable, guarantee the visibility of the attribute in each thread, or complains
- Use the sample
public class Main {
public static void main(String[] args) {
AtomicReferenceFieldUpdater<Test,String> fieldUpdater = AtomicReferenceFieldUpdater.newUpdater(Test.class,String.class,"name");
Test test = new Test("hello world");
fieldUpdater.compareAndSet(test,"hello world"."siting"); System.out.println(fieldUpdater.get(test)); System.out.println(test.name); }}class Test{
Test(String name){ this.name = name; }
public volatileString name; } -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the output -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- siting sitingCopy the code
accumulator
Striped64
LongAccumulator
LongAdder
//accumulatorFunction: rule, identity: initial value
public LongAccumulator(LongBinaryOperator accumulatorFunction,long identity)
Copy the code
- LongAccumulator and LongAdder both inherit from Striped64. The main idea of Striped64 is similar to ConcurrentHashMap, that is, segmentalization. When the concurrent performance of a single variable is slow, we can spread the mathematical operation across multiple variables. And when you need to calculate the total, you add them up
- LongAdder is a special case implementation of LongAccumulator
- LongAccumulator sample
public static void main(String[] args) throws Exception {
LongAccumulator accumulator = new LongAccumulator(Long::sum, 0);
for(int i=0; i<100000; i++){ CompletableFuture.runAsync(() -> accumulator.accumulate(1));
}
Thread.sleep(1000); // Wait for the CompletableFuture thread to complete before fetchingSystem.out.println(accumulator.get()); } --------------- The command output is ------------------100000
Copy the code
The implementation principle of synchronous components
- Most synchronous components in Java maintain a state value internally and, like atomic components, change the state value through CAS. And the maintenance state modification by Doug Lea abstracts AbstractQueuedSynchronizer (AQS)
- The principle of AQS can be seen in a previous article: detailed explanation of the principle of locking, synchronized, volatile+ CAS underlying implementation
Synchronous components
Already, ReentrantReadWriteLock
- Already, ReentrantReadWriteLock are based on AQS (AbstractQueuedSynchronizer). Because fair and unfair locks are distinguished, they do not inherit AQS directly. Instead, they use internal classes to inherit AQS. Fair and unfair locks implement AQS respectively
- An example of ReentrantLock
ReentrantLock lock = new ReentrantLock();
if(lock.tryLock()){
// Business logic
lock.unlock();
}
Copy the code
- ReentrantReadWriteLock is an example
public static void main(String[] args) throws Exception {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
if(lock.readLock().tryLock()){ / / read lock
// Business logic
lock.readLock().unlock();
}
if(lock.writeLock().tryLock()){ / / write locks
// Business logiclock.writeLock().unlock(); }}Copy the code
Semaphore implementation principles and application scenarios
- Semaphore, like ReentrantLock, also has a strategy for fair and unfair lock competition, and also implements synchronization by inheriting AQS from internal classes
- Popular explanation: Suppose there is a well with up to three people in the position to draw water. For every person who draws water, a place needs to be occupied. When all three positions are filled, the fourth person needs to draw water, and the first three people must wait for one to leave the tap water level, in order to continue to obtain the tap water position
- Use the sample
public static void main(String[] args) throws Exception {
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 3; i++)
CompletableFuture.runAsync(() -> {
try {
System.out.println(Thread.currentThread().toString() + " start ");
if(semaphore.tryAcquire(1)){
Thread.sleep(1000);
semaphore.release(1);
System.out.println(Thread.currentThread().toString() + "End without blocking");
}else {
System.out.println(Thread.currentThread().toString() + "Blocked end"); }}catch (Exception e) {
throw newRuntimeException(e); }});// Make sure the CompletableFuture thread is executed before the main thread terminates
Thread.sleep(2000); } -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the output -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the Thread [monPool - worker - ForkJoinPool.com19.5,main] start
Thread[ForkJoinPool.commonPool-worker-5.5,main] start
Thread[ForkJoinPool.commonPool-worker-23.5,main] start
Thread[ForkJoinPool.commonPool-worker-23.5,main] Thread[ForkJoinPool.com monpool-worker - Is blocked5.5,main] No blocking end Thread[ForkJoinPool.com monpool-worker -19.5,main] No blocking endCopy the code
- You can see that three threads, because the semaphore is set to 2, the third thread will not be able to get the information successfully and will print the end of the block
Implementation principles and usage scenarios of CountDownLatch
- CountDownLatch is also an AQS synchronization operation
- Popular explanation: when playing a game, the main quest needs to be completed by five small quests in order for the main quest to continue. In this case, CountDownLatch can be used, where the main task blocks and waits, counting every small task until all five small tasks have been performed
- Use the sample
public static void main(String[] args) throws Exception {
CountDownLatch count = new CountDownLatch(2);
for (int i = 0; i < 2; i++)
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
System.out.println(" CompletableFuture over ");
count.countDown();
} catch (Exception e) {
throw newRuntimeException(e); }});// Wait for the completion of the CompletableFuture thread
count.await();
System.out.println(" main over "); } -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the output -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- CompletableFuture over CompletableFuture over the main overCopy the code
CyclicBarrier implementation principle and application scenario
- CyclicBarrier is on
ReentrantLock lock
andCondition trip
Property to implement synchronization - A CyclicBarrier needs to block all threads into the await state before all threads are awakened to execute. Imagine that there is a railing that stops five sheep. The railing is pulled up only when all five sheep stand together, and all the sheep can run out of the pen
- Use the sample
public static void main(String[] args) throws Exception {
CyclicBarrier barrier = new CyclicBarrier(2);
CompletableFuture.runAsync(()->{
try {
System.out.println("CompletableFuture run start-"+ Clock.systemUTC().millis());
barrier.await(); // Wait until the main thread also executes in the await state
System.out.println("CompletableFuture run over-"+ Clock.systemUTC().millis());
}catch (Exception e){
throw newRuntimeException(e); }}); Thread.sleep(1000);
// And the CompletableFuture threads wait for each other
barrier.await();
System.out.println("main run over!"); } -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the output -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- CompletableFuture run start -1609822588881
main run over!
CompletableFuture run over-1609822589880
Copy the code
StampedLock
- StampedLock is not implemented by AQS, but by maintaining multiple state values internally and cooperating with CAS
- StampedLock has three modes: write mode, read mode, and Optimistic read mode
- StampedLock read/write locks can be converted to each other
// Get read lock, spin get, return a stamp value
public long readLock(a)
// Attempts to add the lock, returns 0
public long tryReadLock(a)
/ / unlock
public void unlockRead(long stamp)
// Get write lock, spin get, return a stamp value
public long writeLock(a)
// Attempts to write the lock, returns 0 on failure
public long tryWriteLock(a)
/ / unlock
public void unlockWrite(long stamp)
// Try optimistic reading to read a timestamp and validate the timestamp with the validate method
public long tryOptimisticRead(a)
// Verify stamp is valid
public boolean validate(long stamp)
Copy the code
- Use the sample
public static void main(String[] args) throws Exception {
StampedLock stampedLock = new StampedLock();
long stamp = stampedLock.tryOptimisticRead();
// Check whether the version number is valid
if(! stampedLock.validate(stamp)) {// Get read lock, idling
stamp = stampedLock.readLock();
long writeStamp = stampedLock.tryConvertToWriteLock(stamp);
if(writeStamp ! =0) { // Successfully converted to write lock
// Fixme operations
stampedLock.unlockWrite(writeStamp);
} else {
stampedLock.unlockRead(stamp);
// Try to get write read
stamp = stampedLock.tryWriteLock();
if(stamp ! =0) {
// Fixme operationsstampedLock.unlockWrite(writeStamp); }}}}Copy the code
Corrections are welcome
Refer to the article
- Concurrent Striped64 (L accumulator)