For all sample code, see/download at github.com/Wasabi1234/…

1 Basic Concepts

1.1 concurrent

Have two or more threads at the same time, if the program will run multiple threads in a single core processor alternately in or out of memory, these threads “existence” at the same time, each thread is in a state in the process of execution, if you’re running on multi-core processors, at this point, the application of each thread will be assigned to a processor cores, so you can run at the same time.

1.2 High Concurrency

One of the factors that must be considered in the architecture design of distributed systems on the Internet usually means that the system is designed to handle many requests in parallel at the same time.

1.3 Differences and Connections

  • Concurrency: Multiple threads operate the same resource, ensuring thread safety and proper use of resources
  • High concurrency: The service can process many requests simultaneously, improving application performance

2 CPU

2.1 CPU Multi-level Cache

  • Why CPU Cache The CPU frequency is too fast for the main memory to keep up with. Therefore, during the processor clock cycle, the CPU often has to wait for the main memory, wasting resources. The cache is designed to alleviate the speed mismatch between the CPU and memory.
  • Meaning of CPU cache
    1. Time locality If some data is accessed, it is likely to be accessed again in the near future
    2. Spatial locality If a piece of data is accessed, its neighbors may soon be accessed as well

2.2 Cache Consistency (MESI)

Used to ensure the consistency of shared data between multiple CPU caches

  • This cache row is only cached in the CPU’s cache and is modified, inconsistent with the data in main memory, and needs to be written back to main memory at a future point in time before other cpus can read the corresponding memory in main memory. After this value is written to main memory, the state of the cache row changes to E
  • E-exclusive cache rows are cached only in the CPU’s cache and are not modified. Consistent with the data in main memory, e-EXCLUSIVE cache rows can change to the S state when read by another CPU at any time, and to the M state when modified
  • S-shared This cache row can be cached by multiple cpus and is consistent with the data in main memory
  • I – invalid invalid
  • Out-of-order execution optimizes the advantages and risks of concurrency when the processor optimizes code out of order to speed up the computation

3 Project Preparation

3.1 Project initialization

3.2 Concurrent simulation -Jmeter pressure measurement

3.3 Concurrent simulation – code

CountDownLatch

Semaphore

Let’s do the concurrent simulation

package com.mmall.concurrency; import com.mmall.concurrency.annoations.NotThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author shishusheng * @date 18/4/1 */ @slf4j @notThreadSafe Public class ConcurrencyTest {/** ** total requests */ public static int clientTotal = 5000; Public static int threadTotal = 200; public static int threadTotal = 200; public static int count = 0; Public static void main(String[] args) throws Exception {// Define the thread pool ExecutorService = Executors.newCachedThreadPool(); Final Semaphore Semaphore = new Semaphore(threadTotal); Final CountDownLatch CountDownLatch = new CountDownLatch(clientTotal); // Put the request into the thread poolfor(int i = 0; i < clientTotal ; Executorservice.execute (() -> {try {// Semaphore. Acquire (); add(); / / release the semaphore. The release (); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); Executorservice.shutdown (); log.info("count:{}", count); } /** * private static void */ private static voidadd() { count++; }}Copy the code

Run discovery is random, so it’s not thread-safe

4 Thread Safety

4.1 Thread Safety

Is a class thread-safe when accessed by multiple threads behaves correctly, regardless of how the runtime environment is scheduled or how the processes are executed interchangeably, and does not require any additional synchronization or coordination in the calling code

4.2 atomic

2 Atomic package

  • PareAndSwapInt AtomicXXX: CAS, Unsafe.com provides exclusive access, at the same time there is only one thread to operate it
package com.mmall.concurrency.example.atomic; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; /** * @author shishusheng */ @slf4j @threadSafe Public class AtomicExample2 {/** * public static int clientTotal = 5000; Public static int threadTotal = 200; public static int threadTotal = 200; Public static AtomicLong count = new AtomicLong(0); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    System.out.println();
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); // Main memory log.info("count:{}", count.get());
    }
    
    private static void add() { count.incrementAndGet(); // count.getAndIncrement(); }}Copy the code
package com.mmall.concurrency.example.atomic;

import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicReference;

/**
 * @author shishusheng
 * @date 18/4/3
 */
@Slf4j
@ThreadSafe
public class AtomicExample4 {

    private static AtomicReference<Integer> count = new AtomicReference<>(0);

    public static void main(String[] args) {
        // 2
        count.compareAndSet(0, 2);
        // no
        count.compareAndSet(0, 1);
        // no
        count.compareAndSet(1, 3);
        // 4
        count.compareAndSet(2, 4);
        // no
        count.compareAndSet(3, 5); 
        log.info("count:{}", count.get()); }}Copy the code

  • AtomicReference,AtomicReferenceFieldUpdater

  • AtomicBoolean

  • AtomicStampReference: ABA issues for CAS

4.2.2 lock

Synchronized: rely on the JVM

  • Modifier block: Code enclosed in curly braces that applies to the called object
  • Modifier method: The entire method, applied to the called object
  • Modify static methods: The entire static method, applied to all objects
package com.mmall.concurrency.example.count; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author shishusheng */ @slf4j @threadSafe public class CountExample3 {/** ** public static int clientTotal = 5000; Public static int threadTotal = 200; public static int threadTotal = 200; public static int count = 0; public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private synchronized static void add() { count++; }}Copy the code

Synchronized corrects counting class methods

  • Synchronized modifier class: the part enclosed in parentheses that does not apply to synchronized modifier methods that all object subclasses inherit from their parent class.

Lock: rely on special CPU instructions, code implementation

Holdings contrast

  • Synchronized: can not interrupt the lock, suitable for competition is not fierce, good readability
  • Lock: interruptible Lock, diversified synchronization, can maintain the normal when the competition is fierce
  • Atomic: Can maintain normal performance when competition is fierce, better than Lock; Only one value can be synchronized

4.3 the visibility

Changes made to main memory by one thread can be observed by other threads in real time

4.3.1 Reasons why shared variables are not visible between threads

  • Thread cross execution
  • Reordering in combination with thread crossing
  • The updated value of the shared variable is not updated in time between working memory and main memory

4.3.2 Synchronization of visibility

JMM rules on synchronized

  • The thread must flush the latest value of the shared variable to main memory before it can be unlocked
  • When a thread locks, it empties the value of the shared variable in working memory so that the latest value needs to be read from main memory when the shared variable is used (Locking and unlocking are the same lock)

4.3.3 Volatile visibility

This is done by adding memory barriers and disallowing reordering optimizations

  • Writes to volatile variables are followed by a store barrier instruction that flusher shared variable values from local memory to main memory
  • Read operations on volatile variables are preceded by a load barrier instruction that reads the shared variable from main memory
  • Volatile use
volatile boolean inited = false; // thread 1: context = loadContext(); inited=true; / / thread 2:while( !inited ){
    sleep();
}
doSomethingWithConfig(context)
Copy the code

4.4 order

One thread observes the order in which instructions are executed in other threads, and this observation is often disordered due to instruction reordering

The JMM allows the compiler and processor to reorder instructions, but the reordering process does not affect the execution of a single-threaded program, but does affect the correctness of multithreaded concurrent execution

4.4.1 happens-before rules

5 Publishing Objects

5.1 Publishing Objects securely

package com.mmall.concurrency.example.singleton; import com.mmall.concurrency.annoations.NotThreadSafe; @author shishusheng */ @notThreadSafe Public class SingletonExample4 {/** ** Private constructor */ privateSingletonExample4() {// 1, memory = allocate(); // 2, instance = memory = allocate(); // 3, instance = memory = allocate() JVM and CPU optimization, Memory = allocate(); instance = memory sets instance to the newly allocated memory; // ctorInstance() initializes object /** * Singleton */ private static SingletonExample4 instance = null; /** * static factory method ** @return
     */
    public static SingletonExample4 getInstance() {// dual detection mechanism // Bif(instance == null) {// Synchronized (singletonexample4.class) {if(instance == null) { // A - 3 instance = new SingletonExample4(); }}}returninstance; }}Copy the code

7 AQS

7.1 introduction

  • FIFO queues are implemented using Node and can be used to build the basic framework for locks and other synchronization devices
  • An int is used to represent the state
  • The method of use is inheritance
  • Subclasses manipulate state by inheriting and managing their state {acquire and release} through methods that implement it
  • Synchronization components can be implemented in both exclusive locking and shared locking modes (exclusive, shared)

CountDownLatch

package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author shishusheng
 */
@Slf4j
public class CountDownLatchExample1 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e); } finally { countDownLatch.countDown(); }}); } countDownLatch.await(); log.info("finish");
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        Thread.sleep(100);
        log.info("{}", threadNum); Thread.sleep(100); }}Copy the code
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @author shishusheng ** / @slf4j public class CountDownLatchExample2 {private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorServiceexec = Executors.newCachedThreadPool();

       final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

       for (int i = 0; i < threadCount; i++) {
           final int threadNum = i;
           exec.execute(() -> {
               try {
                   test(threadNum);
               } catch (Exception e) {
                   log.error("exception", e); } finally { countDownLatch.countDown(); }}); } countDownLatch.await(10, TimeUnit.MILLISECONDS); log.info("finish");
       exec.shutdown();
   }

   private static void test(int threadNum) throws Exception {
       Thread.sleep(100);
       log.info("{}", threadNum); }}Copy the code

Semaphore usage

CycliBarrier

package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author shishusheng
 */
@Slf4j
public class CyclicBarrierExample1 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e); }}); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum); }}Copy the code

package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author shishusheng
 */
@Slf4j
public class CyclicBarrierExample2 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e); }}); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum);
        try {
            barrier.await(2000, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            log.warn("BarrierException", e);
        }
        log.info("{} continue", threadNum); }}Copy the code

package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
 * @author shishusheng
 */
@Slf4j
public class SemaphoreExample3 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for(int i = 0; i < threadCount; i++) { final int threadNum = i; Exec.execute (() -> {try {// Attempts to obtain a licenseif (semaphore.tryAcquire()) {
                        test(threadNum); // Release a license semaphore.release(); } } catch (Exception e) { log.error("exception", e); }}); } exec.shutdown(); } private static voidtest(int threadNum) throws Exception {
        log.info("{}", threadNum); Thread.sleep(1000); }}Copy the code

9 the thread pool

9.1 newCachedThreadPool

9.2 newFixedThreadPool

9.3 newSingleThreadExecutor

You can see that it’s sequential

9.4 newScheduledThreadPool

10 a deadlock