preface

Java multithreading I personally feel is the most difficult part of javaSe, I used to feel learned, but really have multithreading needs but do not know how to start, in fact, or the knowledge of multithreading is not deep, do not know the application scenario of multithreading API, do not know the running process of multithreading and so on, This article will use examples + diagrams + source code to parse Java multithreading.

Article length is long, we can also choose to see the specific chapter, it is recommended that multi-threaded code all hand knock, never believe you see the conclusion, their own coding after running out, is their own.

What is Java multithreading?

Processes and threads

process

  • When a program is run, a process is started, such as qq, Word
  • The program consists of instructions and data, instructions to run, data to load, instructions are loaded by THE CPU to run, data is loaded into memory, instructions run by the CPU scheduling hard disk, network and other devices

thread

  • A process can be divided into multiple threads
  • A thread is a stream of instructions, the smallest unit of CPU scheduling, executed by the CPU one instruction at a time

Parallelism and concurrency

Concurrency: When a single-core CPU runs multiple threads, time slices switch quickly. Threads take turns executing the CPU

Parallelism: When a multi-core CPU runs multiple threads, it really runs at the same time

Java provides a rich API to support multithreading.

Why multithreading?

Multithreading can be achieved with a single thread to complete, that single thread runs well, why Java to introduce the concept of multithreading?

Benefits of multithreading:

  1. The program runs faster! Come on! Come on!

  2. Make full use of CPU resources. At present, almost no online CPU is single-core, giving full play to the powerful ability of multi-core CPU

What’s so hard about multithreading?

A single thread has only one line of execution, so the process is easy to understand and the code execution process can be clearly outlined in the brain

Multithreading is multiple lines, and generally there is interaction between multiple lines, communication between multiple lines, general difficulties have the following points

  1. The execution result of multithreading is uncertain and affected by CPU scheduling
  2. Multithreading security issues
  3. Thread resource is precious, thread pool operation depends on thread, thread pool parameter setting problem
  4. Multithreaded execution is dynamic, simultaneous, and difficult to track
  5. Multithreading is at the bottom of the operating system level, the source code is difficult

Sometimes I wish I were a byte shuttled around the server, figuring out what was going on, like Wreck-It Ralph (for those of you who haven’t seen the movie).

Java multithreading basic use

Define tasks, create and run threads

Task: The execution body of a thread. That’s our core code logic

Define the task

  1. Inheriting the Thread class (so to speak, merging tasks and threads together)
  2. Implement the Runnable interface (separate tasks from threads, so to speak)
  3. Implement Callable interface (perform tasks using FutureTask)

Thread implements task limitations

  1. The task logic is written in the Run method of Thread class and has the limitation of single inheritance
  2. When creating multiple threads, each task that has member variables does not share them. Static is required to share them

Runnable and Callable address the limitations of threads

But Runbale has the following limitations compared to Callable

  1. The task has no return value
  2. Tasks cannot throw exceptions to callers

The following code defines threads in several ways

@Slf4j
class T extends Thread {
    @Override
    public void run(a) {
        log.info("I inherited Thread's task."); }}@Slf4j
class R implements Runnable {

    @Override
    public void run(a) {
        log.info("I am the task of implementing Runnable."); }}@Slf4j
class C implements Callable<String> {

    @Override
    public String call(a) throws Exception {
        log.info("I'm doing the Callable task.");
        return "success"; }}Copy the code

How a thread is created

  1. Threads are created directly through the Thread class
  2. Create threads from within the thread pool

How to start a thread

  • Call the thread’s start() method
// Start a task that inherits Thread
new T().start();

// Start tasks that inherit Thread anonymous inner classes using lambda optimization
Thread t = new Thread(){
  @Override
  public void run(a) {
    log.info("I'm a task for Thread anonymous inner class."); }};// Start the task implementing the Runnable interface
new Thread(new R()).start();

// Start the task implementing the Runnable anonymous implementation class
new Thread(new Runnable() {
    @Override
    public void run(a) {
        log.info("I'm a task of the Runnable anonymous inner class.");
    }
}).start();

// Start the simplified lambda task that implements Runnable
new Thread(() -> log.info("I am Runnable lambda after simplified task")).start();

// Start a task implementing the Callable interface in conjunction with FutureTask to obtain the results of thread execution
FutureTask<String> target = new FutureTask<>(new C());
new Thread(target).start();
log.info(target.get());

Copy the code

The class diagram for each of the above thread related classes is shown below

Context switch

Multi-core CPU, multi-thread is parallel work, if the number of threads, a single core and concurrent scheduling threads, runtime with the concept of context switch

Context switches occur when the CPU executes a thread’s task and allocates time slices to the thread.

  1. The CPU slice of the thread used up. Procedure
  2. The garbage collection
  3. The thread itself calls methods like sleep, yield, wait, Join, Park, synchronized, lock, and so on

When a context switch occurs, the operating system saves the state of the current thread and restores the state of another thread. The JVM has a memory address called a program counter that keeps track of which line of code the thread has executed. It is thread private.

Thread mode can be set when IDEA breaks, and the debug mode of IDEA can see the change of stack frames

Thread comity -yield()& priority of the thread

The yield() method causes the running thread to switch to the ready state and rescramble for a CPU slice, depending on the CPU allocation.

The following code

// Method definition
public static native void yield(a);

Runnable r1 = () -> {
    int count = 0;
    for(;;) { log.info("-- > 1"+ count++); }}; Runnable r2 = () -> {int count = 0;
    for(;;) { Thread.yield(); log.info("-- > 2"+ count++); }}; Thread t1 =new Thread(r1,"t1");
Thread t2 = new Thread(r2,"t2");
t1.start();
t2.start();

// Run the result
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129504
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129505
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129506
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129507
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129508
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129509
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129510
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129511
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129512
11:49:15.798 [t2] INFO thread.TestYield -             ---- 2>293
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129513
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129514
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129515
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129516
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129517
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129518
Copy the code

As the above results show, the T2 thread performs yield() each time it executes, and thread 1 executes significantly more often than thread 2.

Priority of the thread

A thread’s internal priority is adjusted by a number from 1 to 10. The default thread priority is NORM_PRIORITY:5

When the CPU is busy, higher priority threads get more time slices

Priority Settings are useless when the CPU is idle

 public final static int MIN_PRIORITY = 1;

 public final static int NORM_PRIORITY = 5;

 public final static int MAX_PRIORITY = 10;
 
 // Method definition
 public final void setPriority(int newPriority) {}Copy the code

The CPU is busy

Runnable r1 = () -> {
    int count = 0;
    for(;;) { log.info("-- > 1"+ count++); }}; Runnable r2 = () -> {int count = 0;
    for(;;) { log.info("-- > 2"+ count++); }}; Thread t1 =new Thread(r1,"t1");
Thread t2 = new Thread(r2,"t2");
t1.setPriority(Thread.NORM_PRIORITY);
t2.setPriority(Thread.MAX_PRIORITY);
t1.start();
t2.start();

// Possible run results
11:59:00.696 [t1] INFO thread.TestYieldPriority - ---- 1>44102
11:59:00.696 [t2] INFO thread.TestYieldPriority -             ---- 2>135903
11:59:00.696 [t2] INFO thread.TestYieldPriority -             ---- 2>135904
11:59:00.696 [t2] INFO thread.TestYieldPriority -             ---- 2>135905
11:59:00.696 [t2] INFO thread.TestYieldPriority -             ---- 2>135906
Copy the code

The CPU is idle

Runnable r1 = () -> {
    int count = 0;
    for (int i = 0; i < 10; i++) {
        log.info("-- > 1"+ count++); }}; Runnable r2 = () -> {int count = 0;
    for (int i = 0; i < 10; i++) {
        log.info("-- > 2"+ count++); }}; Thread t1 =new Thread(r1,"t1");
Thread t2 = new Thread(r2,"t2");
t1.setPriority(Thread.MIN_PRIORITY);
t2.setPriority(Thread.MAX_PRIORITY);
t1.start();
t2.start();

Thread 1 has low priority but finishes first
12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>7
12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>8
12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>9
12:01:09.916 [t2] INFO thread.TestYieldPriority -             ---- 2>2
12:01:09.916 [t2] INFO thread.TestYieldPriority -             ---- 2>3
12:01:09.916 [t2] INFO thread.TestYieldPriority -             ---- 2>4
12:01:09.916 [t2] INFO thread.TestYieldPriority -             ---- 2>5
12:01:09.916 [t2] INFO thread.TestYieldPriority -             ---- 2>6
12:01:09.916 [t2] INFO thread.TestYieldPriority -             ---- 2>7
12:01:09.916 [t2] INFO thread.TestYieldPriority -             ---- 2>8
12:01:09.916 [t2] INFO thread.TestYieldPriority -             ---- 2>9

Copy the code

Daemon thread

By default, a Java process must wait for all threads to finish before terminating. There is a special thread called a daemon thread that forces termination even if it has not finished executing when all non-daemons have finished.

The default threads are non-daemons.

A garbage collector thread is a typical daemon thread

// Method definition
public final void setDaemon(boolean on) {
}

Thread thread = new Thread(() -> {
    while (true) {}});// Specify the API. Set to true to indicate that the daemon thread is not daemon. When the main thread ends, the daemon thread terminates.
// The default is false. When the main thread ends, thread continues running without stopping
thread.setDaemon(true);
thread.start();
log.info("The end");
Copy the code

Thread blocking

Thread blocking can be divided into several types. The definition of blocking may vary from the operating system level to the Java level, but there are several ways that threads can block broadly

  1. BIO blocks, which uses a blocking IO stream
  2. Sleep (long time) allows the thread to sleep and enter the blocking state
  3. The thread calling this method blocks and waits for thread A to resume running
  4. Sychronized or ReentrantLock causes a thread to block without acquiring the lock (discussed in the section on synchronous locking)
  5. Calling wait() after acquiring the lock also blocks the thread (described in the section on synchronous locking).
  6. Locksupport.park () blocks the thread (described in the synchronization lock section)

sleep()

Hibernating a thread blocks a running thread. When the sleep period ends, the CPU time slice resumes

// Method definition native method
public static native void sleep(long millis) throws InterruptedException; 

try {
   // Sleep for 2 seconds
   // This method throws InterruptedException, which is InterruptedException that can be interrupted during sleep
   Thread.sleep(2000);
 } catch(InterruptedException e) {}try {
   // Use TimeUnit's API instead of thread.sleep
   TimeUnit.SECONDS.sleep(1);
 } catch (InterruptedException e) {
 }
Copy the code

join()

A JOIN is when the calling thread blocks and waits for a thread to complete

// Method definitions are overloaded
// Wait for the thread to finish before resuming
public final void join(a) throws InterruptedException {}// Specify the join time. The caller thread resumes running without waiting until the thread completes execution within the specified time
public final synchronized void join(long millis)
    throws InterruptedException{}

Copy the code
Thread t = new Thread(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    r = 10;
});

t.start();
// let the main thread block and wait for t thread to finish executing
// Remove the line, the result is 0, plus the result of the line is 10
t.join();
log.info("r:{}", r);

// Run the result
1309:13.892 [main] INFO thread.TestJoin - r:10
Copy the code

– Interrupt ()

// The definition of the related method
public void interrupt(a) {}public boolean isInterrupted(a) {}public static boolean interrupted(a) {}Copy the code

Interrupt flag: Whether the thread is interrupted, true indicates that it is interrupted, false indicates that it is not interrupted

IsInterrupted () gets the interrupt flag of the thread and does not modify the interrupt flag after the call

The interrupt() method is used to interrupt a thread

  1. Threads that explicitly throw InterruptedException, such as Sleep, Wait, or Join, can be interrupted, but the interrupt flag remains false
  2. When a normal thread is interrupted, the thread is not actually interrupted, but the thread is marked true for interruption

Interrupted () gets the interrupt flag of the thread and clears the interrupt flag after the call that is false if true (not used often)

Interrupt example: A background monitoring thread is constantly monitoring and terminates when the outside world interrupts it. The following code

@Slf4j
class TwoPhaseTerminal{
    // Monitor threads
    private Thread monitor;

    public void start(a){
        monitor = new Thread(() ->{
           // Constant monitoring
            while (true){
                Thread thread = Thread.currentThread();
             	// Determine whether the current thread is interrupted
                if (thread.isInterrupted()){
                    log.info("Current thread interrupted, terminated");
                    break;
                }
                try {
                    Thread.sleep(1000);
                	// The interrupt is marked true when it is interrupted in the monitoring logic
                    log.info("Monitor");
                } catch (InterruptedException e) {
                    // The interrupt flag is false
                    // Make the interrupt flag true when calling an interruptthread.interrupt(); }}}); monitor.start(); }public void stop(a){ monitor.interrupt(); }}Copy the code

Thread state

There are some basic APIS that call the above methods to give the thread its state.

Thread states can be divided into five states at the operating system level and six states at the Java API level.

Five kinds of state

  1. Initial state: State when the thread object is created
  2. Runnable state (ready state) : Ready to be scheduled by the CPU after calling the start() method
  3. Running state: The thread retrieves the CPU time slice and executes the logic of the run() method
  4. Blocked: the thread is blocked, abandons the CPU time slice, waits for the unblocked state to return to the ready state to grab the time slice
  5. Termination state: The state of a thread after it has completed execution or thrown an exception

Six state

Internal enumeration State in the Thread class

public enum State {
	NEW,
	RUNNABLE,
	BLOCKED,
	WAITING,
	TIMED_WAITING,
	TERMINATED;
}
Copy the code
  1. The NEW thread object is created
  2. The Runnable thread enters this state after calling the start() method, and there are three cases of this state
    1. Ready: Waiting for the CPU to allocate time slice
    2. Running status: The task is executed using the Runnable method
    3. Blocking state: The state in which the BIO executes a blocking IO stream
  3. The Blocked state when the lock was not acquired (discussed in the section on synchronous locking)
  4. WAITING The state after calling wait(), join(), etc
  5. TIMED_WAITING State after calling sleep(time), wait(time), join(time), etc
  6. The state of the TERMINATED thread after execution is complete or an exception is thrown

Six thread states and methods

Summary of methods related to threads

The core methods in the Thread class are summarized

Method names Whether the static Method statement
start() no Let the thread start, enter the ready state, and wait for the CPU to allocate the time slice
run() no Override the methods of the Runnable interface to execute specific logic when the thread gets the CPU time slice
yield() is The comity of the thread causes the thread that has obtained the CPU time slice to enter the ready state and scramble for the time slice again
sleep(time) is The thread will sleep for a fixed time and enter the blocking state. After the sleep time is completed, the thread will scramble for the time slice again and the sleep can be interrupted
join()/join(time) no Call the join method of the thread object, and the caller thread enters the block and waits for the thread object to complete execution or reach the specified time before resuming, so as to regain the time slice
isInterrupted() no Gets the interrupt flag of the thread, true: interrupted, false: not interrupted. The interrupt flag is not modified after the call
interrupt() no Methods that interrupt a thread and throw InterruptedException are interrupted, but the interrupt flag is not changed after the interruption, as is the case with threads that normally execute
interrupted() no Gets the interrupt flag for the thread. The interrupt flag is cleared after the call
stop() no Stopping a thread from running is not recommended
suspend() no Suspending threads is not recommended
resume() no It is not recommended to resume running threads
currentThread() is Get current thread

Thread-specific methods in Object

Method names Method statement
wait()/wait(long timeout) The thread that acquired the lock is blocked
notify() Randomly wake up one of the threads being waited ()
notifyAll(); Wake up all threads being waited () to regain the time slice

Synchronization lock

Thread safety

  • There is nothing inherently wrong with a program running multiple threads
  • Problems can occur when multiple threads access a shared resource
    • There is no problem with multiple threads reading the shared resource
    • When multiple threads read and write to a shared resource, the problem occurs if instructions are interleaved

Critical section: a piece of code is called a critical section if it is a multithreaded read/write operation on a shared resource.

Note that instruction interleaving refers to the fact that when Java code is parsed into a bytecode file, one line of Java code may have multiple lines in the bytecode, which may be interlaced when thread context switches.

Thread safety refers to that when multiple threads call the method of the same object’s critical area, the object’s attribute value must not be wrong, which is to ensure thread safety.

Unsafe code as follows

// A member variable of the object
private static int count = 0;

public static void main(String[] args) throws InterruptedException {
  // t1 thread to variable +5000 times
    Thread t1 = new Thread(() -> {
        for (int i = 0; i < 5000; i++) { count++; }});// t2 thread to variable -5000 times
    Thread t2 = new Thread(() -> {
        for (int i = 0; i < 5000; i++) { count--; }}); t1.start(); t2.start();// execute t1 t2
    t1.join();
    t2.join();
    System.out.println(count);
}

// Run the result
-1399
Copy the code

The code above has two threads, one +5000 and one -5000. If it is thread safe, count should still be 0.

But it runs many times, and each time it gets a different result, and it’s not zero, so it’s not thread-safe.

Are thread-safe classes necessarily thread-safe for all operations?

Thread-safe classes, such as ConcurrentHashMap, are often mentioned in development. Thread-safe means that each individual method in a class is thread-safe, but not necessarily a combination of methods.

Are member and static variables thread-safe?

  • If no multiple threads are shared, thread-safe
  • If there is multi-threaded sharing
    • Multithreading has only read operations, so it is thread safe
    • Multithreading exists write operation, write operation code is critical area, thread is not safe

Are local variables thread-safe?

  • Local variables are thread-safe
  • Objects referenced by local variables are not necessarily thread-safe
    • If the object does not escape the scope of the method, it is thread-safe
    • If the object escapes the scope of the method, such as the return value of the method, consider thread-safety

synchronized

A synchronous lock, also called an object lock, is a lock on an object. Different objects are different locks.

This keyword is used to ensure thread-safety and is a blocking solution.

So that at most one thread can hold the object lock at a time, other threads trying to acquire the object lock will be blocked, without worrying about context switching issues.

Note: Do not assume that a thread that locks into a synchronized block will execute forever. If the time slice is switched, other threads will execute, and when switched back, it will execute immediately, but not to the resource that has a competing lock because the current thread has not released the lock.

Synchronized wakes up the waiting thread when a thread has finished executing a synchronized block

Synchronized actually uses object locking to ensure that the atomicity of a critical region is indivisible and not broken by thread switching

The basic use

// The method is actually locked on this
private synchronized void a(a) {}The lock object can be arbitrary and is applied to this in the same way as the a() method
private void b(a){
    synchronized (this) {}}// Add a lock to a static method
private synchronized static void c(a) {}// Block synchronization actually locks class objects in the same way the c() method does
private void d(a){
    synchronized (TestSynchronized.class){
        
    }
}

// Monitorenter is where the lock is placed
 0 aload_0
 1 dup
 2 astore_1
 3 monitorenter
 4 aload_1
 5 monitorexit
 6 goto 14 (+8)
 9 astore_2
10 aload_1
11 monitorexit
12 aload_2
13 athrow
14 return
Copy the code

Thread-safe code

private static int count = 0;

private static Object lock = new Object();

private static Object lock2 = new Object();

 // Both t1 threads and T2 objects lock the same object. This ensures thread safety. No matter how many times this code is executed, the result is 0
public static void main(String[] args) throws InterruptedException {
    Thread t1 = new Thread(() -> {
        for (int i = 0; i < 5000; i++) {
            synchronized(lock) { count++; }}}); Thread t2 =new Thread(() -> {
        for (int i = 0; i < 5000; i++) {
            synchronized(lock) { count--; }}}); t1.start(); t2.start();// execute t1 t2
    t1.join();
    t2.join();
    System.out.println(count);
}
Copy the code

Key: lock is added to the object, must ensure that it is the same object, lock can take effect

Thread communication

wait+notify

Communication between threads can be achieved by sharing variables +wait()&notify()

Wait () blocks the thread, and notify() wakes it up

The lock object is associated with an underlying Monitor object (an implementation of the heavyweight lock) when multiple threads compete for access to the object’s synchronous methods

As shown in the following figure, Thread0,1 compete for the lock. After executing the code, threads 2,3,4, and 5 simultaneously execute the code of the critical section, and compete for the lock

  1. Thread0 first obtains the lock of the object and associates it with the owner of monitor. After calling wait() of the lock object, the wait() method will enter waitSet and wait, and Thread-1 is also in the same way, and The status of Thread0 is Waitting
  2. Thread2, thread3, thread4, and thread5 compete at the same time. After Thread2 obtains the lock, the owner of the monitor is associated with thread3, thread4, and thread5 can only wait in the EntryList. At this time, Thread2 is in the Runnable state and thread3, thread4, and thread5 are in the Blocked state
  3. 2 Wakes up the entryList threads. 3, 4, and 5 compete for locks. The obtained threads are associated with the owner of monitor
  4. When the notify() or notifyAll() of the lock object is called by threads 3, 4, and 5 during execution, the waitSet thread is woken up, and the awakened thread enters the entryList to wait for the re-contention of the lock

Note:

  1. The status of Blocked and Waitting are both Blocked

  2. The Blocked thread wakes up when the owner thread releases the lock

  3. Wait and notify can be invoked only when synchronization is performed and the lock of the object is obtained. Otherwise, exceptions will be thrown

  • Wait () releases the lock into waitSet, passing in time and waking up automatically if it is not woken up within the specified time
  • Notify () wakes up a random waitSet thread
  • NotifyAll () wakes up all threads in waitSet
static final Object lock = new Object();
new Thread(() -> {
    synchronized (lock) {
        log.info("Commence execution");
        try {
          	// can only be called inside the code
            lock.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("Continue with the core logic."); }},"t1").start();

new Thread(() -> {
    synchronized (lock) {
        log.info("Commence execution");
        try {
            lock.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("Continue with the core logic."); }},"t2").start();

try {
    Thread.sleep(2000);
} catch (InterruptedException e) {
    e.printStackTrace();
}
log.info("Begin to awaken.");

synchronized (lock) {
  // can only be called inside the code
    lock.notifyAll();
}
// Execution result
14:29:47.138[T1] INFO TestWaitNotify - Start execution14:29:47.141[T2] INFO TestWaitNotify - Start execution14:29:49.136[main] INFO TestWaitNotify - Start to wake up14:29:49.136[T2] INFO TestWaitNotify - Continue with the core logic14:29:49.136[T1] INFO TestWaitNotify - Continue execution of core logicCopy the code

What’s the difference between ‘wait’ and ‘sleep’?

Both cause the thread to block, with the following differences

  1. Wait is the Object method and sleep is the Thread method
  2. Wait releases the lock immediately. Sleep does not release the lock
  3. The state of a thread after wait is Time_Waiting

park&unpark

LockSupport is a utility class under JUC that provides the park and unpark methods for thread communication

Differences compared to wait and Notity

  1. Wait and notify need to obtain the object lock park unpark Do not
  2. Unpark can specify notify to wake up randomly
  3. The order of park and unpark can be reversed before that of unpark wait and notify

Producer-consumer model

It refers to that there are producers to produce data, and consumers to consume data. When the producers are full, they stop producing and inform the consumers to take out the data.Copy the code

Consumer does not consume did not consume, inform producer to produce, produce to continue to consume again.

  public static void main(String[] args) throws InterruptedException {
        MessageQueue queue = new MessageQueue(2);
		
		// Three producers store values to the queue
        for (int i = 0; i < 3; i++) {
            int id = i;
            new Thread(() -> {
                queue.put(new Message(id, "Value" + id));
            }, "Producer" + i).start();
        }

        Thread.sleep(1000);

		// A consumer keeps fetching values from the queue
        new Thread(() -> {
            while (true) { queue.take(); }},"Consumer").start(); }}// Message queues are held by producers and consumers
class MessageQueue {
    private LinkedList<Message> list = new LinkedList<>();

    / / capacity
    private int capacity;

    public MessageQueue(int capacity) {
        this.capacity = capacity;
    }

    /** ** ** /
    public void put(Message message) {
        synchronized (list) {
            while (list.size() == capacity) {
                log.info("Queue full, producer waiting");
                try {
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.addLast(message);
            log.info("Production message :{}", message);
            // Inform consumers after productionlist.notifyAll(); }}public Message take(a) {
        synchronized (list) {
            while (list.isEmpty()) {
                log.info("Queue empty, consumers wait.");
                try {
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Message message = list.removeFirst();
            log.info("Consume message :{}", message);
            // Notify the producer after consumption
            list.notifyAll();
            returnmessage; }}}/ / message
class Message {

    private int id;

    private Object value;
}
Copy the code

Synchronous Lock Cases

In order to express the concept of synchronization lock more vividly, here is a life example, as far as possible to the concept of the above concrete out.

Here’s an example of something that everyone is really interested in. Money!!!!!! (Except Teacher Ma).

In reality, we go to the ATM at the gate of the bank to withdraw money. The money of the ATM is a shared variable. To ensure security, it is impossible for two strangers to enter the same ATM to withdraw money at the same time.

There are multiple atMs, the money inside is not affected by each other, there are also multiple locks (multiple object locks), the withdrawal of money in multiple ATMs at the same time there is no security problem.

If each stranger who withdraws money is a thread, when the person who withdraws money enters the ATM and locks the door (the thread gets the lock), when the person who withdraws money leaves the ATM (the thread releases the lock), the next person compets for the lock to withdraw money.

Assuming that the staff is also a thread, if the withdrawal finds that the money is insufficient after entering the ATM, the staff is notified to add money to the ATM (notifyAll method is called), and the withdrawal stops and enters the lobby of the bank to block and wait (wait method is called).

The staff in the lobby of the bank and the withdrawal of money are woken up and re-compete for the lock. If the withdrawal of money after entering the bank, because the ATM has no money, they have to wait in the lobby of the bank.

When an employee gains the ATM lock and enters, he/she adds the money and notifies the person in the lobby to withdraw the money (notifyAll). Pause the raise yourself, enter the bank lobby and wait for the wake up raise (call wait method).

At this time, people waiting in the lobby to compete for the lock, who get who enter to continue to withdraw money.

Different from the real world, there is no concept of queuing. Whoever grabs the lock goes in and takes it.

ReentrantLock

Reentrant lock: Once a thread has acquired the lock on an object, the lock is available to the execution method when it needs to acquire it. As follows

private static final ReentrantLock LOCK = new ReentrantLock();

private static void m(a) {
    LOCK.lock();
    try {
        log.info("begin");
      	/ / call (m1)
        m1();
    } finally {
        // Note the lock releaseLOCK.unlock(); }}public static void m1(a) {
    LOCK.lock();
    try {
        log.info("m1");
        m2();
    } finally {
        // Note the lock releaseLOCK.unlock(); }}Copy the code

Synchronized is also a ReentrantLock and has the following advantages

  1. The timeout period for the lock is obtained
  2. Lock acquisition can be interrupted
  3. Can be set to fair lock
  4. You can have different condition variables, that is, multiple Waitsets that you can specify to wake up

api

// The default is unfair lock. If true is passed, the lock is unfair
ReentrantLock lock = new ReentrantLock(false);
// Try to get the lock
lock()
// Release lock must be placed in finally block
unlock()
try {
    // Lock acquisition can be interrupted, blocking threads can be interrupted
    LOCK.lockInterruptibly();
} catch (InterruptedException e) {
    return;
}
// Return false if the lock cannot be obtained
LOCK.tryLock()
// Return false if the timeout period has not been obtained
tryLock(long timeout, TimeUnit unit)
One lock can create multiple lounges
Condition waitSet = ROOM.newCondition();
// The lock is released into the waitSet, and other threads can grab the lock
yanWaitSet.await()
// Wake up the thread of the specific lounge and then overwrite the contention lock
yanWaitSet.signal()

Copy the code

Example: one thread outputs A, one thread outputs B, one thread outputs C, ABC outputs sequentially, five times in a row

Wait ()/notify() and control variables are used to implement this function. ReentrantLock is used to implement this function.

  public static void main(String[] args) {
        AwaitSignal awaitSignal = new AwaitSignal(5);
        // Construct three condition variables
        Condition a = awaitSignal.newCondition();
        Condition b = awaitSignal.newCondition();
        Condition c = awaitSignal.newCondition();
        // Start three threads
        new Thread(() -> {
            awaitSignal.print("a", a, b);
        }).start();

        new Thread(() -> {
            awaitSignal.print("b", b, c);
        }).start();

        new Thread(() -> {
            awaitSignal.print("c", c, a);
        }).start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        awaitSignal.lock();
        try {
            // Wake up a first
            a.signal();
        } finally{ awaitSignal.unlock(); }}}class AwaitSignal extends ReentrantLock {

    // The number of cycles
    private int loopNumber;

    public AwaitSignal(int loopNumber) {
        this.loopNumber = loopNumber;
    }

    / * * *@paramPrint the character *@paramCurrent Current condition variable *@paramNext The next condition variable */
    public void print(String print, Condition current, Condition next) {

        for (int i = 0; i < loopNumber; i++) {
            lock();
            try {
                try {
                    // Get the lock and wait
                    current.await();
                    System.out.print(print);
                } catch (InterruptedException e) {
                }
                next.signal();
            } finally{ unlock(); }}}Copy the code

A deadlock

Speaking of deadlocks, for example,

Here is the code implementation

static Beer beer = new Beer();
static Story story = new Story();

public static void main(String[] args) {
    new Thread(() ->{
        synchronized (beer){
            log.info("I got a drink. Give me a story.");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (story){
                log.info("Xiao Wang starts drinking and telling stories."); }}},"Wang").start();

    new Thread(() ->{
        synchronized (story){
            log.info("I got a story. Give me a drink.");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (beer){
                log.info("Lao Wang began to drink and tell stories."); }}},"Wang").start();
}
class Beer {}class Story{}Copy the code

Deadlocks cause the program to fail

The detection tool can check for deadlock information

Java Memory Model (JMM)

The JMM is embodied in the following three aspects

  1. Atomicity ensures that instructions are not affected by context switching
  2. Visibility ensures that instructions are not affected by the CPU cache
  3. Orderliness ensures that instructions are not affected by parallel optimization

visibility

A program that can’t be stopped

static boolean run = true;

public static void main(String[] args) throws InterruptedException {
    Thread t = new Thread(() -> {
        while (run) {
            / /...}}); t.start(); Thread.sleep(1000);
   // Thread T does not stop as expected
    run = false; 
}
Copy the code

As shown above, threads have their own working cache, and when the main thread modifies variables and synchronizes them to main memory, thread T doesn’t read them, so the program can’t stop

order

The JVM may adjust the order in which statements are executed without affecting program correctness, a condition also known as instruction reordering

  static int i;
  static int j;
// Perform the following assignment in a threadi = ... ; j = ... ; It's possible to assign j firstCopy the code

atomic

You should be familiar with the above synchronized code blocks to ensure atomicity, that is, a piece of code is a whole, atomicity to ensure thread safety, will not be affected by context switch.

volatile

This keyword addresses both visibility and orderliness, and volatile is implemented through memory barriers

  • Write barriers

Write barriers are added after object write operations, data before the write barriers are synchronized to main memory, and write barriers are executed before write barriers

  • Read barrier

A read barrier is added before an object read operation, statements behind the read barrier are read from main memory, and code behind the read barrier is guaranteed to execute behind the read barrier

Note: Volatile does not address atomicity, that is, thread-safety cannot be achieved with this keyword.

Volatile is used when one thread reads a variable and another thread operates on it. This keyword ensures that the thread that reads the variable is aware of the write.

No lock – cas

Compare and swap (CAS) compares and swaps

When assigning a value to a variable, the value v read from the memory gets the new value n to swap. When executing compareAndSwap(), the compareAndSwap() method compares whether v and the current memory values are consistent. If they are consistent, the n and V are swapped.

The cas layer is cpu-level, that is, the atomicity of operations can be guaranteed without the use of synchronous locks.

private AtomicInteger balance;

// Simulate the cas operation
@Override
public void withdraw(Integer amount) {
    while (true) {
        // Get the current value
        int pre = balance.get();
        // Get the new value after the operation
        int next = pre - amount;
        // Compare and set successfully break otherwise spin retry
        if (balance.compareAndSet(pre, next)) {
            break; }}}Copy the code

Locking is more efficient than previous locking because it does not involve context switching by threads

Cas is the optimistic lock idea, sychronized is the pessimistic lock idea

Cas is suitable for scenarios where there are few contention threads, and if contention is strong, retries occur frequently, reducing efficiency

Juc contains atomic classes that implement CAS

  1. AtomicInteger/AtomicBoolean/AtomicLong
  2. AtomicIntegerArray/AtomicLongArray/AtomicReferenceArray
  3. AtomicReference/AtomicStampedReference/AtomicMarkableReference

AtomicInteger

Commonly used API

new AtomicInteger(balance)
get()
compareAndSet(pre, next)
// i.incrementAndGet() ++i
// i.decrementAndGet() --i
// i.getAndIncrement() i++
// i.getAndDecrement() ++i
 i.addAndGet()
  // Pass in the functional interface to modify I
  int getAndUpdate(IntUnaryOperator updateFunction)
  // Cas core method
  compareAndSet(int expect, int update)
Copy the code

ABA problem

Cas has an ABA problem, that is, when comparing and swapping, if the original value is A, another thread changes it to B, and another thread changes it to A.

The swap actually took place, but the comparison and swap succeeded because the values did not change

The solution

AtomicStampedReference/AtomicMarkableReference

The above two classes solve THE ABA problem by increasing the version number of the object, and by increasing the version number each time you change it, the ABA problem can be avoided

ABA problems can also be avoided by adding a Boolean variable identifier and adjusting the Boolean variable value after modification

The thread pool

Introduction to thread pools

Thread pool is one of the most important and difficult points in Java concurrency, and it is the most widely used in practice.

Thread resources are very precious, it is impossible to create infinite, there must be a tool to manage threads, thread pool is a tool to manage threads, Java development often pooling ideas, such as database connection pool, Redis connection pool, etc..

Create some threads in advance and execute them directly when the task is submitted, which can save the time to create threads and control the number of threads.

Benefits of thread pools

  1. Reduce resource consumption and control resources by reducing the cost of creating and destroying threads through pooling
  2. Improved response time, tasks can be run without creating threads when they arrive
  3. Provide more powerful functions, high scalability

Thread pool constructor

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {}Copy the code

Meaning of constructor parameters

Parameter names Parameter meaning
corePoolSize Core threads
maximumPoolSize Maximum number of threads
keepAliveTime Idle time of emergency thread
unit Unit of idle time for emergency threads
workQueue Blocking queue
threadFactory Factory for creating threads, mainly defining thread names
handler Rejection policies

Thread pool example

Let’s use an example to understand the thread pool parameters and the process of receiving tasks from the thread pool

The bank handles the business as shown above.

  1. When customers go to the bank, they open counters for handling. Counters are equivalent to threads and customers are equivalent to tasks. There are two regular counters and three temporary counters. 2 is the number of core threads and 5 is the maximum number of threads. There are two core threads
  2. When the counter opened to the second, they were still doing business. When customers come again, they line up in the queuing hall. There are only three seats in the queuing hall.
  3. When the queuing hall is full, customers will continue to open counters. At present, there are three temporary counters, namely, three emergency threads
  4. At this time, customers can not normally provide business for them, using the rejection strategy to deal with them
  5. When the counter finishes processing the business, it will fetch the task from the queuing hall. When the counter cannot fetch the task after a period of idle time, if the number of current threads is greater than the number of core threads, the thread will be reclaimed. That is to cancel the counter.

Status of the thread pool

The state of the thread pool is represented by the high three bits of an int variable, and the low 29 bits store the number of thread pools

The name of the state Three a Receive new tasks Process blocking queue tasks instructions
Running 111 Y Y Receive and process tasks normally
Shutdown 000 N Y Does not receive tasks, executes ongoing tasks, and processes tasks in a blocking queue
stop 001 N N Tasks will not be received, ongoing tasks will be interrupted, and tasks in the blocking queue will be abandoned
Tidying 010 N N The current active thread is 0, and the task is about to end
Termitted 011 N N Put an end to the state
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
Copy the code

The main flow of thread pools

Steps for a thread pool to create, receive, execute, and reclaim threads

  1. The following steps can be performed only when the thread pool status is Running after it is created
  2. When a task is submitted, the thread pool creates threads to process the task
  3. When the number of worker threads in the thread pool reaches corePoolSize, continuing to submit tasks will enter the blocking queue
  4. When the blocking queue is full, the task continues to be submitted and an emergency thread is created to process it
  5. When the number of worker threads in the thread pool reaches maximumPoolSize, the reject policy is executed
  6. When a thread retrieves a task after the keepAliveTime expires and the number of worker threads exceeds corePoolSize, the thread is reclaimed

Note: It is not the newly created thread that is the core thread, but the later threads that are created are non-core threads. There is no such thing as a core non-core thread. This is a long-standing misconception of mine.

Rejection policies

  1. The caller throws RejectedExecutionException (the default policy)
  2. Let the caller run the task
  3. Abort the mission
  4. Discards the earliest task in the blocking queue and joins the task

Methods for submitting tasks

/ / execution Runnable
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null.false);
    }
    else if(! addWorker(command,false))
        reject(command);
}
/ / submit Callable
public <T> Future<T> submit(Callable<T> task) {
  if (task == null) throw new NullPointerException();
   // Build FutureTask internally
  RunnableFuture<T> ftask = newTaskFor(task);
  execute(ftask);
  return ftask;
}
// Submit Runnable, specifying the return value
publicFuture<? > submit(Runnable task) {if (task == null) throw new NullPointerException();
  // Build FutureTask internally
  RunnableFuture<Void> ftask = newTaskFor(task, null);
  execute(ftask);
  return ftask;
} 
// Submit Runnable, specifying the return value
public <T> Future<T> submit(Runnable task, T result) {
  if (task == null) throw new NullPointerException();
   // Build FutureTask internally
  RunnableFuture<T> ftask = newTaskFor(task, result);
  execute(ftask);
  return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
}
Copy the code

Execetors creates a thread pool

Note: None of the following is recommended

1.newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
Copy the code
  • Core threads = Maximum number of threads No emergency threads
  • Blocking queue unbounded may result in OOM

2.newCachedThreadPool

public static ExecutorService newCachedThreadPool(a) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
Copy the code
  • The number of core threads is 0, and the maximum number of threads is unlimited. Emergency threads are recycled in 60 seconds
  • The SynchronousQueue implementation has no capacity, that is, once queued, no thread can fetch it
  • The number of threads may be excessive and the CPU may be overloaded

3.newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor(a) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
Copy the code
  • The number of core threads and the maximum number of threads are both 1, there is no emergency thread, and the unbounded queue can continuously receive tasks
  • Tasks are serialized to be executed one by one, and wrapper classes are used to shield thread pool parameters such as corePoolSize from modification
  • If a thread throws an exception, a new thread is created to continue execution
  • May cause OOM

4.newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
Copy the code
  • The thread pool for task scheduling can be invoked at a deferred time or at intervals

Closing the thread pool

shutdown()

Causes the thread pool to be shutdown and unable to receive tasks, but completes the worker thread and tasks in the blocking queue, which is a graceful shutdown

public void shutdown(a) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
Copy the code

shutdownNow()

Causes the thread pool to stop, cannot receive tasks, immediately interrupts the executing worker thread, and does not execute tasks in the blocking queue, but returns a list of tasks in the blocking queue

public List<Runnable> shutdownNow(a) {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
Copy the code

Proper usage posture for thread pools

The difficulty of thread pool is the configuration of parameters, a set of theoretical configuration parameters

CPU intensive: This refers to the fact that the program is CPU intensive

Number of core threads: Number of CPU cores +1

IO intensive: Remote RPC calls, database operations, etc., do not need to use the CPU for a large number of operations. Most application scenarios

Number of core threads = Number of cores * EXPECTED CPU utilization * Total time/CPU runtime

However, it is difficult to configure based on the above theory, because CPU computation time is difficult to estimate

For the actual size, see the following table

cpu-intensive IO intensive
Number of threads Kernel number <=x<= kernel number *2 Cores *50<=x<= cores *100
The queue length y>=100 1<=y<=10

1. The thread pool parameters are configured in distributed mode, and the application does not need to restart when the configuration is modified

The thread pool parameters vary according to the number of requests on line, and the best way is that the number of core threads, the maximum number of threads, and the queue size are configurable

CorePoolSize maxPoolSize queueSize is configured

Java provides method-override parameters that are handled internally by the thread pool for smooth modification

public void setCorePoolSize(int corePoolSize) {}Copy the code

2. Add thread pool monitoring

3. The IO intensive task can be added to the maximum number of threads before the task is placed in the blocking queue

The code mainly overrides methods that block queues to join tasks

public boolean offer(Runnable runnable) {
    if (executor == null) {
        throw new RejectedExecutionException("The task queue does not have executor!");
    }

    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int currentPoolThreadSize = executor.getPoolSize();
       
        // If the number of submitted tasks is smaller than the number of threads currently created, there are still idle threads.
        if (executor.getTaskCount() < currentPoolThreadSize) {
            // Put the task in a queue and let the thread process it
            return super.offer(runnable);
        }
		// Core changes
        // If the current number of threads is less than the maximum number of threads, return false to let the thread pool create new threads
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        // Otherwise, the task is queued
        return super.offer(runnable);
    } finally{ lock.unlock(); }}Copy the code

3. Rejection Policy It is recommended to use Tomcat’s rejection policy (give it a chance)

// Tomcat source code
@Override
public void execute(Runnable command) {
    if( executor ! =null ) {
        try {
            executor.execute(command);
        } catch (RejectedExecutionException rx) {
            1 If the task cannot be obtained, the task is rejected
            if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException("Work queue full."); }}else throw new IllegalStateException("StandardThreadPool not started.");
}
Copy the code

You are advised to change the method for retrieving tasks from a queue: Increase the timeout period. If a task cannot be fetched after 1 minute, the task is returned

public boolean offer(E e, long timeout, TimeUnit unit){}
Copy the code

conclusion

Work for three or four years, has not formally written a blog, self-study has been accumulated through the way of notes, recently re-learned the Java multithreading, thinking of the weekend this part of the content seriously write a blog to share out.

The article is long, give a big thumbs up to those who see it! Due to the author’s limited level, and the first time to write a blog, the article will inevitably have mistakes, welcome friends feedback correction.

If you find the article helpful, please like it, comment on it, forward it, and go ahead

Your support is my biggest motivation!!