Week1

Multiple threads read and write to the same shared variable

CPU memory model

When a computer executes a program, every instruction is executed in the CPU, and data reading and writing are involved in the process of executing the instruction, while all data are stored in the main memory during the operation of the computer (for example, in an ordinary 4C8G machine, 8G refers to the capacity of main memory). The CPU reads data from main memory to perform calculations.

Each CPU has its own cache. When running, the CPU copies the data to be run from the computer’s main memory to the CACHE of the CPU. Then the CPU performs operations based on the cache data and refreshes the cache data to the main memory. In this way, the CPU’s execution speed can be greatly improved.

Java memory model

It is the MEMORY model of the JVM that shields memory access differences between different operating systems and underlying hardware to achieve consistent memory access across platforms.

After the JVM is started, the operating system allocates a certain amount of memory for the JVM process, which is called “main memory”.

All other Java programs work are done by a thread, and every thread has a small piece of memory, called “working memory”, in Java thread in the process of execution, will first copy the data from main memory to the thread of working memory, and perform computations, performing calculations, the calculation result more break into the main memory.

Atomicity of concurrent programming

Solutions to concurrency problems

unlocked

A local variable

Immutable object

An immutable object is one whose external state does not change once it is created.

If the state of an object is immutable, then naturally there is no concurrency problem. Because an object is immutable, it remains constant no matter how many threads do anything to it.

ThreadLocal

Each thread has its own local variable.

CAS atomic operation

(Optimistic lock operation)

CAS stands for Compare And Swap. The CAS mechanism uses three basic operands: Memory address V, the old expected value A, the new value B to be modified, will be updated to the new value B only if the value of memory address V is equal to the old expected value A.

CAS is an atomic operation.

Have a lock

Pessimistic locking

synchronized

ReentrantLock

Source code analysis –CopyOnWriteArrayList

Write time replication, volatile, ReentrantLock

Equivalent immutable object

Objects generally conform to some of the characteristics of immutable objects, but the internal state may change under certain circumstances

public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess.Cloneable.java.io.Serializable {
    private static final long serialVersionUID = 8673264195747942595L;

    /** The lock protecting all mutators */
    final transient ReentrantLock lock = new ReentrantLock();

    /** The array, accessed only via getArray/setArray. */
    private transient volatile Object[] array;

    /** * Gets the array. Non-private so as to also be accessible * from CopyOnWriteArraySet class. */
    final Object[] getArray() {
        return array;
    }

    /** * Sets the array. */
    final void setArray(Object[] a) { array = a; }... }Copy the code

private transient volatile Object[] array;

Array Arrays are used to store collection elements and can only be accessed through getArray and setArray.

The iterator

    public Iterator<E> iterator(a) {
        return new COWIterator<E>(getArray(), 0);
    }
Copy the code

The iterator gets a copy of the array through getArray(), and iterates based on the array currently retrieved.

add

    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally{ lock.unlock(); }}Copy the code

Get the current array, copy it to a new array, and add elements to the new array. Finally, the new array overwrites the old one

This process requires locking.

Copy-on-write mechanism

The iteration of a collection is essentially a read operation. Adding an element to a collection is essentially a write operation.

This applies to scenarios where you read a lot and write a little.

  • The read operation takes a “snapshot” of the data and is not guaranteed to get the latest data. (Weak consistency)

  • Read and write operations do not operate on the same array.

Usage scenarios

CopyOnWriteArrayList is suitable for scenarios where there are too many reads and too few writes

JDBC

JDBC driver loading process, mainly based on Java 1.6 ServiceLoader API (SPI)

This is the MySQL JDBC Driver

public class Driver extends NonRegisteringDriver implements java.sql.Driver {
    public Driver(a) throws SQLException {}static {
        try {
            DriverManager.registerDriver(new Driver());
        } catch (SQLException var1) {
            throw new RuntimeException("Can't register driver!"); }}}Copy the code

Will call DriverManager. RegisterDriver to register the driver

DriverManager#registerDriver
    public static synchronized void registerDriver(java.sql.Driver driver, DriverAction da)
        throws SQLException {

        /* Register the driver if it has not already been added to our list */
        if(driver ! =null) {
            registeredDrivers.addIfAbsent(new DriverInfo(driver, da));
        } else{... }... }Copy the code

registeredDrivers:

// List of registered JDBC drivers
private final static CopyOnWriteArrayList<DriverInfo> registeredDrivers = new CopyOnWriteArrayList<>();
Copy the code

CopyOnWriteArrayList#addIfAbsent

    public boolean addIfAbsent(E e) {
        Object[] snapshot = getArray();
        return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
            addIfAbsent(e, snapshot);
    }
    private boolean addIfAbsent(E e, Object[] snapshot) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] current = getArray();
            int len = current.length;
            if(snapshot ! = current) {// Optimize for lost race to another addXXX operation
                int common = Math.min(snapshot.length, len);
                for (int i = 0; i < common; i++)
                    if(current[i] ! = snapshot[i] && eq(e, current[i]))return false;
                if (indexOf(e, current, common, len) >= 0)
                        return false;
            }
            Object[] newElements = Arrays.copyOf(current, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally{ lock.unlock(); }}Copy the code

Analysis of reading more than writing less scenario:

  • Write less:

    RegisterDrivers is used to store different database drivers, but generally speaking, there is only one database type on a project, even in some complex scenarios, a project may correspond to multiple database types, including Mysql database, Oracle database, and even some other databases.

    But no matter how many database type, database drivers are generally loaded when the program started, i.e. registerDriver methods in general are invoked when the program started, in the subsequent process generally will not call this method, in the process of running this scenario perfect accord with the definition of “less”, Basically, while the program is running, there are no writes (add/remove, etc.)

  • Read more:

    When we call JDBC in our application to get a database connection, we iterate through all the drivers, find one, and get the connection from that particular driver.

    DriverManager#getConnection()

            for(DriverInfo aDriver : registeredDrivers) {
                // If the caller does not have permission to load the driver then
                // skip it.
                if(isDriverAllowed(aDriver.driver, callerCL)) {
                    try {
                        println(" trying " + aDriver.driver.getClass().getName());
                        Connection con = aDriver.driver.connect(url, info);
                        if(con ! =null) {
                            // Success!
                            println("getConnection returning " + aDriver.driver.getClass().getName());
                            return(con); }}catch (SQLException ex) {
                        if (reason == null) { reason = ex; }}}else {
                    println(" skipping: "+ aDriver.getClass().getName()); }}Copy the code

    Check whether each driver can be loaded by traversing registerDrivers, and if so, get the connection from the driver. This getConnection method will be called frequently, because it will be called more frequently when you are developing projects that need to interact with the database and get connections.

The implementation of CopyOnWriteArrayList is essentially to improve read request concurrency through weak consistency. It is suitable for scenarios where data is read too much and written too little.

Week2

Thread state flow

Thread state

! [](liutianruo-2019-go-go-go.oss-cn-shanghai.aliyuncs.com/notes/5. Thread status.png)

Thread State enumeration type

NEW(initialization state)

When a NEW thread is created, it enters the NEW state

Thread state for a thread which has not yet started.

RUNNABLE (ready, running state)

Thread state for a runnable thread. A thread in the runnable state is executing in the Java virtual machine but it may be waiting for other resources from the operating system such as processor

There are two main states: READY and RUNNING

  • READY: The scheduler has not yet dispatched you to the thread, and you are still idle

    • After calling the start method of the current thread, the thread enters the ready state
    • The current thread’s sleep method ends; The join of the other thread ends; A thread holds the object lock
    • The current thread time slice is used up. Call the current thread’s yield() method (the current thread voluntarily cedes CPU resources)
    • The thread in the lock pool has acquired the object lock
  • RUNNING:

    The state of a thread when the thread scheduler selects a thread from the runnable pool as the current thread. This is also the only way a thread can get into a running state.

The virtual drive is BLOCKED.

The state in which a thread blocks before entering a method or code block modified by the synchronized keyword to acquire a lock.

I’m WAITING for you.

After calling wait, the thread is in a WAITING state, WAITING to be waked up.

TIMED_WAITING (wait timeout state)

The thread is in TIMED_WAITING state after the sleep or wait method is called. The thread is automatically woken up after the timeout.

TERMINATED state

  1. When a thread’s run() method completes, or the main() method completes, we consider it terminated. The thread object may be alive, but it is no longer a single thread executing. Once a thread terminates, it cannot be resurrected.
  2. On a termination of the thread calls the start () method, which will be thrown. Java lang. IllegalThreadStateException anomalies.

State switching between threads

NEW -> RUNNABLE

For a newly created (NEW) thread, when the thread wants to execute, it must call the object’s start() method to transition the NEW state to the RUNNABLE state.

! [] (liutianruo-2019-go-go-go.oss-cn-shanghai.aliyuncs.com/note/6. NEW to the RUNNABLE state. PNG)

RUNNABLE -> BLOCKED

! [] (liutianruo-2019-go-go-go.oss-cn-shanghai.aliyuncs.com/note/7.RUNN… -_ BLOCKED state was converted. PNG)

  1. Threads are converted from RUNNABLE to BLOCKED while waiting for synchronized implicit locks

    If the thread is not running, why would it block, or wait for a synchoronized implicit lock

  2. When a waiting thread obtains a synchronized implicit lock, it is switched from BLOCKED to RUNNABLE and waits for the CPU to schedule it.

RUNNABLE -> WAITING

! [] (liutianruo-2019-go-go-go.oss-cn-shanghai.aliyuncs.com/note/8. RUNNABLE – _ WAITING state. PNG)

RUNNABLE -> WAITING

  1. To obtain the synchoronized implicit lock thread, call object.wait ()

  2. Call a thread’s join()

    Join () : thread A is WAITING for thread A to finish executing when a.jin () is called. The state of the WAITING thread transitions from RUNNABLE to WAITING. When thread A finishes executing, the WAITING thread transitions from WAITING state to RUNNABLE.

  3. Call the locksupport.park () method

    Java and locks in the package are implemented based on the LockSupport object. Call the locksupport.park () method and the current thread will block and the state of the thread will change from RUNNABLE to WAITING. Call locksupport. unpark(Thread Thread) to wake up the target Thread. The state of the target Thread changes from WAITING to RUNNABLE.

RUNNABLE -> TIMEDWAITING

! [](liutianruo-2019-go-go-go.oss-cn-shanghai.aliyuncs.com/note/9. RUNNABLE -_ TIMED_WAITING.png)

  1. java.lang.Thread#sleep(long)
  2. To obtain the synchoronized implicit lock thread, call java.lang.object #wait(long)
  3. java.lang.Thread#join(long)
  4. java.util.concurrent.locks.LockSupport#parkNanos(long)
  5. java.util.concurrent.locks.LockSupport#parkUntil(long)

The only difference between a TIMED_WAITING state and a WAITING state is that the trigger condition has a timeout parameter.

RUNNABLE -> TERMINATED

Threads TERMINATED automatically after executing the run() method and TERMINATED if an exception is thrown during the run() method.

If execution of the run() method needs to be forcibly interrupted, the interrupt() method is called.

The interrupt() method simply notifies the thread, giving it a chance to do something later, or ignoring the notification.

! [] (liutianruo-2019-go-go-go.oss-cn-shanghai.aliyuncs.com/note/10.RUN… -_ TERMINATED.png)

Conclusion brain figure

Deadlock problem

A deadlock occurs when two or more threads hold resources needed by each other during execution. As a result, these threads are in a waiting state and cannot continue execution. Without an external force, they would not be able to move forward, entering a state of “permanent” blockage.

Deadlock example

package com.ruyuan2020.java.concurrence.week2;


/** * deadlocked demo *@author ajin
 * */
public class DeadLockDemo {
    public static String obj1 = "obj1";
    public static String obj2 = "obj2";

    public static void main(String[] args) {
        Thread a = new Thread(new Lock1());
        Thread b = new Thread(newLock2()); a.start(); b.start(); }}class Lock1 implements Runnable{

    @Override
    public void run(a) {
        try {
            System.out.println("lock1 is running");
            while (true) {synchronized (DeadLockDemo.obj1){
                    System.out.println("Lock1 lock obj1");
                    Thread.sleep(3000);
                    synchronized (DeadLockDemo.obj2){
                        System.out.println("Lock2 lock obj2"); }}}}catch(InterruptedException e) { e.printStackTrace(); }}}class Lock2 implements Runnable{

    @Override
    public void run(a) {
        try {
            System.out.println("lock2 is running");
            while (true) {synchronized (DeadLockDemo.obj2){
                    System.out.println("Lock2 lock obj2");
                    Thread.sleep(3000);
                    synchronized (DeadLockDemo.obj1){
                        System.out.println("Lock1 lock obj1"); }}}}catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code

lock2 is running Lock2 lock obj2 lock1 is running Lock1 lock obj1

Cause of deadlock

The following four conditions must be met for a deadlock to occur:

  1. Mutually exclusive, mutually exclusive resources X and Y can only be accessed by one thread
  2. Hold and wait. Thread 01 has acquired resource X and does not release shared resource Y while waiting for resource X
  3. No preemption. Other threads cannot forcibly preempt resources occupied by thread 01
  4. Thread 01 waits for resources from thread 02, and thread 02 waits for resources from thread 01.

Ways to avoid deadlocks

After a deadlock occurs, there is no good solution, usually we have to restart the system.

The best way to solve deadlocks is to avoid them.

Break the “hold and wait” condition

Is to claim all the resources at once.

Break the “Non-preemption” condition

A thread requiring a deadlock can actively release the resources it occupies, but synchronized cannot.

When synchronized does not apply for resources, the thread directly enters the blocking state, and when synchronized enters the blocking state, there is no way to release the resources it occupies.

Solution: Use the timed tryLock feature of the Lock class explicitly instead of the built-in locking mechanism to detect and recover from deadlocks. A thread using a built-in lock will block if it does not acquire the lock, whereas an explicit lock can specify a Timeout after which the tryLock will return a failure message and release the resource it owns.

Break the “loop wait” condition

“Orderly Allocation of Resources”

Resources in the system must be numbered in a unified manner. Threads can apply for resources at any time in the sequence of resource numbers. This ensures that the system does not deadlock.

Mind mapping

Guarded Suspension model

The “waiting-notice” mechanism in the system of Guarded Suspension was a popular way of cooperation between threads.

Guarded: protect

Suspension: temporarily suspended

If thread 01 can’t get all the locks, it blocks itself and enters a “WAITING” state. When all conditions specified by thread 01 are met, thread 01 in the waiting state is notified to execute again. (Wait-notification mechanism)

The thread first obtains the mutex. When the conditions required by the thread are not met, the mutex is released and the thread enters the waiting state. When the required condition is met, the waiting thread is notified to reacquire the mutex.

Code examples to understand:

public class GuardedQueue {

    private final Queue<Integer> sourceList;

    public GuardedQueue(a) {
        this.sourceList = new LinkedBlockingDeque<>();
    }

    public synchronized Integer get(a) {
        // If the List is empty, the wait state is entered
        while (sourceList.isEmpty()) {
            try {
                wait();
            } catch(InterruptedException e) { e.printStackTrace(); }}return sourceList.peek();
    }

    public synchronized void put(Integer e) { sourceList.add(e); notifyAll(); }}public class Application {

    public static void main(String[] args) throws InterruptedException {
        GuardedQueue guardedQueue = new GuardedQueue();
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        executorService.execute(guardedQueue::get);

        Thread.sleep(2000);

        executorService.execute(() -> {
            guardedQueue.put(20);
        });

        executorService.shutdown();

        executorService.awaitTermination(30, TimeUnit.SECONDS); }}Copy the code

Conclusion:

The current thread obtains the lock first, and if the condition is not met, the wait method is called to make itself in a WAITING state. When the condition is met, the notifyAll() method is called to wake up the thread into the RUNNABLE state.

Comparison of notify() and notifyAll() :

There is a difference between notify() and notifyAll(). Notify () randomly notifies any thread in the wait queue, while notifyAll() notifies all threads in the wait queue.

Using notify() is also risky because randomly notifying waiting threads can result in some threads never being notified.

NotifyAll () is generally recommended.

The use of Guarded Suspension mode in BlockingQueue

“Guarded Suspension” : Protective Suspension mode.

This “protective suspend” mode can greatly reduce the performance cost caused by lock conflict when multithreading obtains locks. When the thread is accessing an object, it finds that the condition is not met, and then suspends until the condition is met.

If thread 01 can’t get all the locks, it blocks itself and enters a “WAITING” state. When all requirements of thread 01 are met, thread 01 in the waiting state is notified to continue.

BlockingQueue is a BlockingQueue, and access to a BlockingQueue may cause a block under certain circumstances.

  • Queue is full. Join the queue
  • If the queue is empty, queue out
public interface BlockingQueue<E> extends Queue<E> {
    
    // Insert the specified element at the end of the queue (if available immediately and does not exceed the capacity of the queue)
    // Return true on successful insertion, and throw IllegalStateException if the queue is full.
    boolean add(E e);

  
    boolean offer(E e);

   // Insert the specified element at the end of the queue, if the queue is full, wait until (blocking)
    void put(E e) throws InterruptedException;

     // Insert the specified element at the end of the queue (if available immediately and does not exceed the capacity of the queue)
   // Insert the specified element at the end of the queue. If the queue is full,
   // Wait for the available space until the specified wait time is reached
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

   // Get and move the head of the queue, wait (block) if there are no elements,
   // Until an element wakes up the waiting thread to perform the operation
    E take(a) throws InterruptedException;

    // Get and move the head of the queue, wait until the specified waiting time to fetch the element, after which the method ends
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    Removes a single instance of the specified element (if present) from this queue.
    boolean remove(Object o);

   
}

Copy the code

ArrayBlockingQueue

The constructor
    
    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

	public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
Copy the code
Non-blocking method
offer(E)
    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // When the number of queue elements equals the length of the array, no element can be added
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true; }}finally{ lock.unlock(); }}Copy the code

items: final Object[] items;

offer(E, long, java.util.concurrent.TimeUnit)
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // Wait a while
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            / / into the queue
            enqueue(e);
            return true;
        } finally{ lock.unlock(); }}Copy the code

The enqueue (E) :

The operation of entrance

   private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
       	// Get the current array
        final Object[] items = this.items;
         // Assign an array to putIndex
        items[putIndex] = x;
        PutIndex = 0; putIndex = 0; putIndex = 0;
        if (++putIndex == items.length)
            putIndex = 0;
        // The number of elements in the queue increases by 1
        count++;
        // Wake up the thread calling the take() method to perform the element fetch operation.
        notEmpty.signal();
    }
Copy the code
// items index for next put, offer, or add
int putIndex;
// Condition for waiting takes
private final Condition notEmpty;
Copy the code
Blocking methods
Blocking insert elements:put(E e)
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // When the number of queue elements equals the length of the array, no element can be added
            while (count == items.length)
                 // Suspends the current calling thread and adds it to the notFull conditional queue to wait for awakening
                notFull.await();
            If the queue is not full, add it directly
            enqueue(e);
        } finally{ lock.unlock(); }}Copy the code

The PUT method is a blocking method. If the queue is full, the current thread will be suspended by the notFull condition object and added to the queue until the queue meets the notFull condition again, so that it can continue to add put.

But if the queue element itself is not full, the enqueue(e) method is called to add the element to the array queue.

There are two cases of thread execution:

  • If the queue is full, the incoming thread performing the PUT operation will be added to the notFull conditional queue to wait;
  • If the queue is not full, the put thread will wake up when the queue element is removed successfully.
Blocking delete:take()
    public E take(a) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // The queue length is 0
            while (count == 0)
                / / blocking
                notEmpty.await();
            // If there are elements in the queue, delete them
            return dequeue();
        } finally{ lock.unlock(); }}/** Condition for waiting takes */
    private final Condition notEmpty;
Copy the code

Dequeue () : deletes the queue header element and returns

    private E dequeue(a) {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] ! = null;
        // Get the current array
        final Object[] items = this.items;
        // Get the object to delete
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        // Set takeIndex to null (delete)
        items[takeIndex] = null;
         // add 1 to the index and check if it equals the length of the array.
         // If it is equal, the end is reached, and return to 0
        if (++takeIndex == items.length)
            takeIndex = 0;
        // The number of elements in the array is reduced by 1
        count--;
        if(itrs ! =null)
            // Update the element data in the iterator
            itrs.elementDequeued();
        The notFull condition object is called up to add a thread and perform the add operation
        notFull.signal();
        return x;
    }
Copy the code

When the take method is executed, the queue is deleted if there are elements in it, the queue is blocked if it is empty (note: this block can be broken), and the notEmpty conditional queue is added to wait.

If a new thread adds data via the PUT method, the take thread is also woken up, that is, in enqueue, the notEmpty condition waits for the thread to remove the element.

If a new thread removes data via the take method, the thread in the notFull Condition queue is awakened in the dequeue to insert elements.

conclusion
  1. When a put operation is performed, if the queue is full, it is added to the notFull queue. If a task removes an element by a take operation, the thread in the notFull Condition queue is woken up to perform a PUT operation.

  2. When the take operation is performed, if the queue is empty, it is added to the notEmpty wait queue. When a task inserts an element through the PUT operation, the thread in the notEmpty condition wait queue is awakened to perform the take operation.

Week 3

Two-phase Termination mode

  • Definition:

    1. Signals that a running thread will be terminated.
    2. The thread that receives this signal finishes cleaning up and stops running.
  • Core idea: Not only to ensure that the thread flexibly change the running state, but also to ensure that the thread gracefully complete the current task

Thread.interrupt()

If this thread is blocked in an invocation of the wait(), wait(long), or wait(long, int) methods of the Object class, or of the join(), join(long), join(long, int), sleep(long), or sleep(long, int), methods of this class, then its interrupt status will be cleared and it will receive an InterruptedException.

The interrupt() method wakes a thread in a waiting state in a manner that causes it to throw InterruptedException, which terminates the thread, but it can continue running as long as the exception is caught and handled.

Thread pool shutdown

ThreadPoolExcutor

ThreadPoolExecutor#shutdownNow

    public List<Runnable> shutdownNow(a) {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // Change the thread pool state to "STOP"
            advanceRunState(STOP);
            // All threads in the pool are interrupted
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
Copy the code
interruptWorkers
    private void interruptWorkers(a) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally{ mainLock.unlock(); }}// Worker
  void interruptIfStarted(a) {
            Thread t;
            if (getState() >= 0&& (t = thread) ! =null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
Copy the code
Worker thread
 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in. Null if factory fails. */
        final Thread thread;
        /** Initial task to run. Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile longcompletedTasks; . }Copy the code
Worker.run()
public void run(a) {
      runWorker(this);
}

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while(task ! =null|| (task = getTask()) ! =null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted. This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally{ afterExecute(task, thrown); }}finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally{ processWorkerExit(w, completedAbruptly); }}Copy the code
ThreadPoolExecutor#getTask
private Runnable getTask(a) {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            This step returns null (rs = STOP)
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if(r ! =null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false; }}}Copy the code
// The logical exit is normal
completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
Copy the code
ThreadPoolExecutor#processWorkerExit
 private void processWorkerExit(Worker w, boolean completedAbruptly) {
     	// false
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            // Remove the current Worker from workers
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if(! completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0&&! workQueue.isEmpty()) min =1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null.false); }}Copy the code

ThreadPoolExecutor#shutdown

    public void shutdown(a) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // Set the thread pool state to SHUTDOWN
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
Copy the code
ThreadPoolExecutor#interruptIdleWorkers(boolean)
private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if(! t.isInterrupted() && w.tryLock()) {try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally{ w.unlock(); }}if (onlyOne)
                    break; }}finally{ mainLock.unlock(); }}Copy the code

conclusion

  1. After the shutdown method is executed, the thread pool refuses to accept new tasks, but waits until all tasks in the thread pool and those already in the blocking queue are complete before closing the thread pool.

  2. The shutdownNow method is a bit more violent, because the thread pool, the tasks in the blocking queue are not allowed to execute any more, but it returns the tasks in the blocking queue that have not finished executing.

Week 4

Promise model

The Promise pattern is an asynchronous programming pattern. At its core, when task A is performed, such as boiling water, the result of its execution can be obtained through A Promise object. Therefore, there is no need to wait for task A to finish before executing task B. Task B can be executed at the same time as task A, and when it needs the results of task A’s execution, it gets them through this Promise object (to hear the beep, for example).

In a program, this means that the program will run faster because promises can avoid unnecessary waits and improve concurrency support.

The Promise mode consists of four parts: Promisor, Executor, Promise, and Result.

  1. Executors need to execute in the create() method of the Promisor.
  2. The return value of create() is Promise.
  3. Result is the return value of the get() method in Promise.

Auxiliary class:

/** * Make tea **@author ajin
 */
public class BoilWater {

    boolean status = false;

    public boolean isStatus(a) {
        return status;
    }

    public void setStatus(boolean status) {
        this.status = status; }}/** * Prepare cups and tea **@author ajin
 */
public class TeaAndCup {

    boolean status = false;

    public boolean isStatus(a) {
        return status;
    }

    public void setStatus(boolean status) {
        this.status = status; }}Copy the code

Promisor class

public class Promisor {

    public static Future<Object> create(long startTime) {
        // 1. Define tasks
        FutureTask<Object> futureTask = new FutureTask<>(() -> {
            System.out.println("Start boiling water, current time" + (System.currentTimeMillis() - startTime) + "ms");
            BoilWater boilWater = new BoilWater();
            Thread.sleep(15000);
            boilWater.setStatus(true);
            return boilWater;
        });
        // 2. Executor execution
        new Thread(futureTask).start();
        // 3. Return Promise
        returnfutureTask; }}Copy the code

This is consistent with the responsibility of the promisor.create () method as depicted in our diagram

public static void main(String[] args) throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
        / / get the Promise
        Future<Object> promise = Promisor.create(startTime);

        System.out.println("Start preparing tea leaves. It takes 3 minutes." + (System.currentTimeMillis() - startTime) + "ms");
        TeaAndCup teaAndCup = new TeaAndCup();
        Thread.sleep(3000);
        teaAndCup.setStatus(true);
        System.out.println("Finish preparing tea leaves." + (System.currentTimeMillis() - startTime) + "ms");

        if(! promise.isDone()) { System.out.println("Tea and tea finished, waiting for the boiling water to finish.");
        }

        // Get execution results (may block)
        BoilWater boilWater = (BoilWater) promise.get();
        System.out.println("Obtain the signal of boiling water completion, current time:" + (System.currentTimeMillis() - startTime) + "ms");

        System.out.println("Preparation done. Let's make the tea.");

        System.out.println("Total time:" + (System.currentTimeMillis() - startTime) + "ms");


    }
Copy the code

Start preparing teacup tea leaves, 3 minutes, current time: 87ms Start boiling water, current time: 88ms finish preparing teacup tea leaves, current time: 3099ms Teacup tea leaves are finished, wait for the completion of boiling water to obtain the completion of boiling water signal, current time: 15104ms Finish preparation, start making tea total time: 15104ms

Application scenario of the Promise pattern in FutureTask source code

FutureTask inheritance relationship

  • Parent: RunnableFuture
    • Runnable
    • Future

The constructor

Member variables:

    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;
Copy the code
    /**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Callable}.
     *
     * @param  callable the callable task
     * @throws NullPointerException if the callable is null
     */
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    /**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Runnable}, and arrange that {@code get} will return the
     * given result on successful completion.
     *
     * @param runnable the runnable task
     * @param result the result to return on successful completion. If
     * you don't need a particular result, consider using
     * constructions of the form:
     * {@codeFuture<? > f = new FutureTask<Void>(runnable, null)} *@throws NullPointerException if the runnable is null
     */
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
Copy the code

Runnale vs. Callable:

Although Callable is similar to Runnable, Callable can throw exceptions. The more important difference is that the call() method in Callable has a return value compared to the run() method in Runnable, which does not.

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call(a) throws Exception;
}
Copy the code

The adapter also needs to be adapted to Callable by combining Runnable and calling runnable.run () in the callable.call () method.

    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call(a) {
            task.run();
            returnresult; }}Copy the code

FutureTask#run()

    public void run(a) {
        if(state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            // Focus on Callable
            Callable<V> c = callable;
            if(c ! =null && state == NEW) {
                V result;
                boolean ran;
                try {
                    / / core
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    / / coreset(result); }}finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if(s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}Copy the code
FutureTask#set(V)

The return value of the incoming callable is assigned to the member variable outcome of FutureTask in a thread-safe manner.

This means that starting the Future will fetch a result via Callable and put the result into the member variable Outcome, waiting to be retrieved.

    protected void set(V v) {
        // Change the state from NEW to COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion(); }}Copy the code

Now we understand, why create a Callable object in the constructor?

Because of the Callable object, you can get the return value and store it in the outcome variable.

private Object outcome; // non-volatile, protected by state reads/writes

Use a diagram to illustrate the futureTask.run () method execution logic:

! [] (liutianruo-2019-go-go-go.oss-cn-shanghai.aliyuncs.com/note/14. FutureTask. The run () understand. PNG)

FutureTask.get()

Get results.

    public V get(a) throws InterruptedException, ExecutionException {
        int s = state;
        // If not completed, the current thread waits
        if (s <= COMPLETING)
            s = awaitDone(false.0L);
        return report(s);
    }
Copy the code
FutureTask#awaitDone

Wait for the thread to complete execution.

 private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            // The thread completes and returns directly
            if (s > COMPLETING) {
                if(q ! =null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if(! queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                / / wait for
                LockSupport.park(this); }}Copy the code
FutureTask.report(int)
   private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
Copy the code

! [] (liutianruo-2019-go-go-go.oss-cn-shanghai.aliyuncs.com/note/15. FutureTask understand. PNG)

conclusion

  1. The Promise pattern applies to scenarios where asynchronous threads need to be started to improve performance and results from asynchronous threads need to be returned.

  2. The Promise pattern consists of: 1 Promisor, 2 Executor, 3 Promise, and 4 Result.

  3. The Promise pattern is implemented in Java through the construction of FutureTask and the invocation of the GET () method.

Week 5

Producer-consumer model

There are three important roles in the producer and consumer pattern: producer, task queue, and consumer. The producer submits the task to the queue, and the consumer retrieves the task from the queue for processing.

MQ middleware, often used in the development of popular microservice systems, is also a producer and consumer pattern.

Too full problem

Producers produce faster than consumers consume per unit of time, causing tasks to pile up in the blocking queue, and it’s only a matter of time before the queue fills up.

Necessary: we need to set the queue size properly, so that the blocking queue does not fill up because the producer is working too fast!

The scene of a

1. Consumers process less per day than producers produce; For example, producers can only consume 10,000 pieces per day, while consumers can only consume 5,000 pieces per day.

2. Solution: Consumers plus machines

3, reason: producers can not limit the flow, because to be processed in a day, only consumers and machines

Scenario 2

1. Consumers process more daily than producers produce.

2, the system peak producer speed is too fast, the queue jammed

3. Solution: Increase the queue appropriately

4, reason: the consumption capacity of consumers in a day has been higher than that of producers, that means it can certainly be processed in a day, to ensure that the peak does not fill the queue

Scenario 3

1. Consumers process more daily than producers produce

2. Limited conditions or other reasons, the queue cannot be set to be particularly large

3, the system peak producer speed is too fast, the queue jammed

4. Solution: the producer limits the flow

5. Reason: The consumption capacity of consumers in a day is higher than that of producers, which means that the processing can be completed in a day, and the queue is too small, which can only limit the flow of producers, so that the speed of peak plug queue is slow

Producer and consumer patterns in JDK native thread pools

Basic concepts of thread pools

  1. Executor: Interface that represents a thread pool. There’s an execute() method that throws in a Runnable object and assigns a thread for you to execute
  2. ExecutorService: This is an Executor subinterface, much like a thread pool interface, with methods such as thread pool destruction
  3. * Create the thread pool you need by following up the thread pool closers and following up the thread pool closers
  4. ThreadPoolExecutor: ExecutorService (ExecutorService) : ExecutorService (ExecutorService) : ExecutorService (ExecutorService) : ExecutorService (ExecutorService) : ExecutorService (ExecutorService) : ExecutorService (ExecutorService) : ExecutorService (ExecutorService) : ExecutorService (ExecutorService) : ExecutorService (ExecutorService) : ExecutorService (ExecutorService) : ExecutorService (ExecutorService) : ExecutorService (ExecutorService
ThreadPoolExecutorUnderstanding of core parameters
  • CorePoolSize: The number of core threads in the thread pool
  • MaximumPoolSize: the maximum number of threads allowed in the thread pool
  • KeepAliveTime: If the number of threads is greater than corePoolSize, the extra threads are released after a specified amount of time. This is used to set the wait time for idle threads
  • Unit: This is the keepAliveTime unit above
  • WorkQueue: this is said by ThreadPoolExecutor. The execute () method was thrown Runnable task, if all threads are busy, will be in a queue to queue, this is the queue (blocking queue)
  • ThreadFactory: if new threads need to be created to add to the thread pool, they are created using this threadFactory
  • Handler: If the workQueue above has a fixed size, what if the number of tasks thrown to the queue exceeds the size of the queue? I’m going to use this handler to handle it.

Submit tasks to the thread pool

public void execute(Runnable command) {
    	// If the task is null, an NPE exception is thrown
        if (command == null)
            throw new NullPointerException();
       	// the atomic variable CTL stores the thread state + the number of threads. We use an int to store two numbers, the first three bits representing the state of the thread, and the last 29 bits representing the number of threads
        int c = ctl.get();
    
    	// Check whether the number of current threads is smaller than the number of core threads. If so, start a new thread to execute the task
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
    	// If the thread pool is in the RUNNING state, add tasks to the blocking queue
        if (isRunning(c) && workQueue.offer(command)) {// The task was successfully added
             // Retrieve the value of CTL, because the thread state may have changed when the task was added to the queue
            int recheck = ctl.get();
           	// If the thread is no longer in the RUNNING state, the task is removed
            if(! isRunning(recheck) && remove(command))// Refuse to execute
                reject(command);
            else if (workerCountOf(recheck) == 0)
                // If there are no threads in the current thread pool, add a thread
                addWorker(null.false);
        }
    	// If the queue is full, a new thread (non-core thread) is added. If the new thread fails, a rejection policy is implemented
        else if(! addWorker(command,false))// If it is not in the RUNNING state, the new Work thread fails
            reject(command);
    }
Copy the code

Consuming task

Worker threads are used to consume tasks.

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
Copy the code
ThreadPoolExecutor#runWorker(Worker)
  • ThreadPoolExecutor#getTask
  • task.run()

Task sources:

If the creation of a new Worker thread is triggered after the task is submitted, the Worker thread will execute the task as its firstTask.

 Thread wt = Thread.currentThread();
        // Worker's first task
        Runnable task = w.firstTask;
        w.firstTask = null;// Point firstTask to null
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while(task ! =null|| (task = getTask()) ! =null) {
Copy the code

If firstTask=null, the getTask() method is called to get the task

GetTask core source:

try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if(r ! =null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
Copy the code

Week 6

The Active Object model

Active Object mode

An asynchronous programming pattern in which method invocation and method execution are separated and executed in two threads.

Synchronous versus asynchronous invocation

System. The gc () scene:

The system.gc () method is usually called when we want to control garbage collection, but we don’t know when garbage collection is going to happen. Just sending a garbage collection request to the JVM after the call doesn’t mean that a garbage collection is imminent. It depends on the JVM’s own schedule.

The Active Object model

The core idea of the active object pattern is to realize the asynchronous programming pattern by separating the calling and execution processes.

This has two advantages. First, it improves the concurrency and throughput of the system. Second, it reduces the coupling of the system. If the specific execution logic needs to be modified, it will not affect the caller and is more convenient to maintain.

To simplify the complexity of asynchronous invocation, the active object pattern separates method execution from invocation. With this model, an object feels the same when accessed externally by a client, whether or not there are separate threads making asynchronous calls to it.

! [] (liutianruo-2019-go-go-go.oss-cn-shanghai.aliyuncs.com/note/17.Act… The Object model. The PNG)

1) the Future

In active object mode, the Proxy returns a Future object immediately after the client submits the task. The client takes this object and can use the Future object to retrieve task execution results if needed.

The functionality required by this component can be implemented by the Futrue class in the JAVa.util.Concurrent package of the JDK.

2) MethodRequest

When a request is sent to the Proxy, the Proxy does not process the request directly, but encapsulates contextual information such as the request parameters into a MethodRequest object.

MethodRequest, to put it bluntly, is designed to encapsulate asynchronous tasks. You can use the JDK java.lang.Runnale or Callable from a concurrent package, whether Runnale or Callable, depending on whether you need a return value.

Callable has a return value

Runnable/Callable

3) ActivationQueue

The ActivationQueue is a task buffer in the active object mode.

In plain English, a block of memory storage, preferably an ordered queue.

The main purpose is to cache MethodRequest objects that cannot be executed immediately, and then wait for the worker thread to be idle and read the task from them to execute.

There are many options for implementing this task buffer, and the JDK helps us implement some queues that can be used directly, such as LinkedBlockingQueue under the java.util.Concurrent package.

Block queue BlockingQueue

4) the Scheduler

A Scheduler is a Scheduler.

Scheduler is to active object mode what the brain is to a computer’s CPU. Controls the execution timing and process of MethodRequest objects.

You can use ThreadPoolExecutor, which is a thread pool, for this component, or you can implement and package the ExecutorService interface yourself if you have other needs.

ThreadPoolExecutor thread pool

5) the Servant:

Servants implement the asynchronous methods exposed by the Proxy. And is responsible for performing tasks corresponding to asynchronous methods exposed by Proxy (task execution)

Application of active object pattern in thread pool source code

Let’s play the picture again:

! [] (liutianruo-2019-go-go-go.oss-cn-shanghai.aliyuncs.com/note/17.Act… The Object model. The PNG)

The Future component can be implemented by the Future class in the CONCURRENT package of the JDK.

MethodRequest can be implemented using the JAVa.lang.Runnale JDK or the Callable class in the Concurrent package;

ActivationQueue can be implemented as a LinkedBlockingQueue under the Concurrent package;

Scheduler components use ThreadPoolExecutor to implement scheduling functions.

Corresponding understanding in thread pools

Request Context Encapsulation (MethodRequest)

A Runnable, Calllable

Task submitted

1) void execute(Runnable command);

2) Future Submit (Callable Task);

3) Future submit(Runnable task, T result);

4) the Future <? > submit(Runnable task);

The final underlying implementation: Execute (Runnable Command) method

Return the Future object

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    / * * *@throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc} * /
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
Copy the code

Submit the task to return the Future object

ActivationQueue buffer

BlockingQueue

Commit and execute are separated

The client does not need to pay attention to the details of the execution process, but only needs to pay attention to the task itself, such as submitting the task, obtaining the result, and canceling the task.

Week 7

The thread pool

Thread creation

Thread priority: The thread object itself and the thread call stack are required to occupy memory, and the memory of the operating system is limited, which determines the number of threads we can create is also limited, and unlimited creation of threads will constantly consume memory and eventually exceed the memory limit, resulting in an error.

There is a formula for how many threads can be created: number of threads that can be created = (maximum memory for a process – memory allocated by the JVM – memory reserved by the operating system)/thread stack size

In The Java language, when we create a Thread, the Java VIRTUAL machine will create a Thread object in the JVM memory, and at the same time create a Thread of the operating system, and finally map the Native threads of the operating system at the bottom of the system. On Windows it is 1-to-1 mapping (that is, one Java thread maps one operating system thread) and on Linux it is n-to-M mapping (that is, multiple Java threads map multiple operating system threads, and N and M are not exactly equal)

The memory used by the operating system’s threads is not the memory allocated by the JVM, but the memory left on the system.

If you create a thread beyond limit, will throw an exception: Java. Lang. OutOfMemoryError: unable to create new native thread

Advantage of thread pool: Thread pool can avoid creating threads repeatedly, realize thread reuse (limited resources)

Creation of a thread pool

    public static ExecutorService newSingleThreadExecutor(a) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
Copy the code

A thread pool with only one thread. If more than one task is submitted to the thread pool, the task is stored in the queue. When the worker thread is free, other tasks are pulled from the queue and executed. Obtaining tasks follows the first-in, first-out principle of queues

    public static ExecutorService newCachedThreadPool(a) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      SynchronousQueue is a blocking queue with a cache value of 1, which, in plain English, has no caching capability at all
                                      new SynchronousQueue<Runnable>());
    }
Copy the code

It is theoretically possible to create integer.max_value threads (variable number of threads)

If there are free threads that can be reused, reuse threads are preferred.

When all threads are currently working, but new tasks are still being submitted, a new thread is created to schedule the new task.

When there are no idle threads, a new thread is created

   public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
Copy the code

Thread pool with a fixed number of threads: The number of threads in a thread pool is fixed from the moment the pool is created

    public static ScheduledExecutorService newSingleThreadScheduledExecutor(a) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
Copy the code

A thread pool with only one thread that can schedule tasks: ScheduledExecutorService

   public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
Copy the code

A thread pool that can schedule tasks with a specified number of core threads

At the core of a thread pool are worker threads and task queues

ThreadPoolExecutor core parameter

corePoolSize

Meaning: Number of core threads

maximumPoolSize

Meaning: Maximum number of threads

Effect: When the number of core threads is full and the task buffer queue is full, the new submitted task can still be executed, so that it is not rejected immediately. Ensure that the thread pool can accept and execute tasks by creating new worker threads (non-core threads).

Why do you need a maximumPoolSize when you already have corePoolSize?

This is an “elastic design” that keeps the thread pool stable against unexpected traffic.

If maximumPoolSize is not set and only one corePoolSize is set, this will not be a problem in low-concurrency scenarios where the system does not normally perform a large number of tasks, such as less than 10 tasks per second.

However, if the traffic surges at the beginning and end of each month, reaching more than 100 concurrent requests per second. We can’t make the corePoolSize 10 times bigger! If you’re dealing with a brief spike in traffic, you can multiply your normal threads by a factor of 10. Creating a large number of worker threads when there are not as many tasks to perform is a classic waste of resources.

MaximumPoolSize is also essentially a “scaling idea”

keepAliveTime

Meaning: the lifetime of threads that exceed the core thread pool (the total number of threads is less than or equal to maximumPoolSize) and are idle

This part of the thread is assigned a keepAliveTime parameter. The meaning is that when there is no work available, they are told to wait and see if there is a task coming, and if there is one, they will execute the task, otherwise they will wait and reclaim the worker thread after the keepAliveTime is reached.

unit

KeepAliveTime TimeUnit (TimeUnit type)

Generally speaking, we specify the keepAliveTime of the thread pool in seconds or minutes

workQueue

Meaning: a blocking queue used to hold submitted pending tasks

  • ArrayBlockingQueue, a bounded queue based on data
  • LinkedBlockingQueue an unbounded queue based on a linked list
  • SynchronousQueue, a synchronization queue with only one element
  • Priority queue PriorityBlockingQueue

Through a queue, waiting tasks are queued first, providing buffer space for the thread pool to accept tasks to a certain extent rather than reject them outright

Producer-consumer model

ThreadFactory

ThreadFactory: creates a Thread factory. When a worker Thread needs to be added, it is not created as a newThread. Instead, it is created by the newThread method of the implementation class of the ThreadFactory interface

public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}
Copy the code

RejectedExecutionHandler

Meaning: Reject policy of the thread pool

Create a custom thread pool

Specifying the number of threads

Tasks can be divided into CPU intensive (or computational intensive) and I/O intensive tasks according to different task concerns

cpu-intensive

For CPU-intensive tasks, due to the CPU’s fast computing speed, the task can be completed in a short time through the CPU’s super computing power. Therefore, we can set the number of core threads corePoolSize to N (number of cpus) +1. The main reason is to prevent threads from being available in some cases when a wait occurs, such as when a page break occurs. So setting up an extra thread ensures that CPU time slices continue to be used.

IO intensive

For I/O intensive tasks, you can set more threads for the maximum number of threads. The reason is that compared with CPU intensive tasks, the I/O intensive tasks take longer to wait for I/O results than the CPU calculation. In addition, threads in the I/O waiting state do not consume CPU resources. Therefore, you can configure more threads.

In general, we set it to a multiple of the number of cpus. The common trick is to set it to **N (number of cpus) 2*.

For I/ O-intensive tasks, note that the number of core threads need not be set very high because I/O operations themselves can cause context switches, especially blocking I/O. Therefore, you are advised to set the number of I/ O-intensive core threads corePoolSize to 1 and the maximum number of threads maximumPoolSize to N (number of cpus) x 2.

When there is only one thread in the thread pool, it is easy to handle the submitted tasks and context switches are minimal. Then gradually increase the number of threads to the maximum number of threads as the task increases. This does not waste resources and is flexible enough to support scenarios with increased tasks.

The work queue

ThreadFactory

Consider Netty and Tomcat, which provide excellent implementations of custom thread factories

Thread pool rejection policy

Generally speaking, AbortPolicy can be used to throw an exception, but if we want the task to complete and not discard even if the rejection policy is triggered, then we can choose the CallerRunsPolicy rejection policy.

If none of the RejectedExecutionHandler interfaces meets our requirements, you can customize the RejectedExecutionHandler interface to implement a customized rejection policy.

An example of an I/O intensive thread pool is as follows:

Handed over to the Spring IoC container for management as a Spring Bean

Week 8

ThreadLocal

There are thread unsafe issues with variable sharing. If variables are not shared, will there be thread unsafe issues?

Let each thread have its own copy of the object, and there is no sharing of multithreaded variables

ThreadLocal fundamentals

ThreadLocal underlying data structure:

Each thread in Thead can store multiple ThreadLocal localization objects in the ThreadLocalMap, and each ThreadLocal localization object calculates the array subscript through its threadLocalHashCode and allocates the subscript to the corresponding Entry array. This allows you to get and set localized objects.

Memory leak problem

As long as the Thread object is garbage collected, ThreadLocalMap can be reclaimed, so there is no memory leak

If ThreadLocalMap has no strong references, it can be collected by JDK garbage collector. If ThreadLocalMap has no strong references, it can be collected by JDK garbage collector. The ThreadLocal contained in the ThreadLocalMap is reclaimed

The thread pool scenario is different

ThreadLocalMap of each core Thread in the Thread pool is always a strong reference. Therefore, the corresponding ThreadLocal of each core Thread in the Thread pool is not automatically reclaimed.

The key in the Entry of ThreadLocalMap is a WeakReference of WeakReference. With JDK garbage collection, ThreadLocal can be automatically reclaimed

After the JDK triggers garbage collection, the corresponding ThreadLocal can indeed be garbage collected and become null, but the corresponding value of the automatically collected ThreadLocal cannot be automatically collected. (The corresponding value is still referenced by Entry)

The core Thread of the Thread pool is recycled. The corresponding ThreadLoalMap of each Thread is strongly referenced, so the ThreadLoalMap of each Thread cannot be reclaimed. However, a ThreadLoalMap contains multiple entries containing threadlocal-values. Although threadlocal-key is a weak reference that can be collected automatically by the garbage collector, the value corresponding to a ThreadLocal cannot be collected. So there’s the possibility of a memory leak

        private Entry getEntry(ThreadLocal
        key) {
            int i = key.threadLocalHashCode & (table.length - 1);
            Entry e = table[i];
            if(e ! =null && e.get() == key)
                return e;
            else
                // threadLocal=null
                return getEntryAfterMiss(key, i, e);
        }
Copy the code

If ThreadLocal is null, it goes to the getEntryAfterMiss method

       private Entry getEntryAfterMiss(ThreadLocal<? > key,int i, Entry e) {
            Entry[] tab = table;
            int len = tab.length;

            while(e ! =null) { ThreadLocal<? > k = e.get();if (k == key)
                    return e;
                if (k == null)
                    // Set the value of the current thread's ThreadLocal (NULL) to null, and set the corresponding Entry to null
                    // All ThreadLocal values and corresponding entries are set to NULL
                    expungeStaleEntry(i);
                else
                    i = nextIndex(i, len);
                e = tab[i];
            }
            return null;
        }
Copy the code

The expungeStaleEntry method is triggered only when the get, set, and remove methods of ThreadLocal are called, and the value and Entry corresponding to the null ThreadLocal are set to NULL.

Normally, memory leaks do not occur, but if we do not call the set, get, and remove methods of ThreadLocal, we do not set the corresponding value and Entry to NULL, which can cause memory leaks.

So how do you avoid memory leaks? Call ThreadLocal’s remove method when not in use to speed up garbage collection and avoid memory leaks.

Use of ThreadLocal by open source frameworks

zuul

ZuulServlet
 @Override
    public void service(javax.servlet.ServletRequest servletRequest, javax.servlet.ServletResponse servletResponse) throws ServletException, IOException {
        try {
            init((HttpServletRequest) servletRequest, (HttpServletResponse) servletResponse);

            // Marks this request as having passed through the "Zuul engine", as opposed to servlets
            // explicitly bound in web.xml, for which requests will not have the same data attached
            RequestContext context = RequestContext.getCurrentContext();
            context.setZuulEngineRan();

            try {
                preRoute();
            } catch (ZuulException e) {
                error(e);
                postRoute();
                return;
            }
            try {
                route();
            } catch (ZuulException e) {
                error(e);
                postRoute();
                return;
            }
            try {
                postRoute();
            } catch (ZuulException e) {
                error(e);
                return; }}catch (Throwable e) {
            error(new ZuulException(e, 500."UNHANDLED_EXCEPTION_" + e.getClass().getName()));
        } finally {
            // Avoid memory leaksRequestContext.getCurrentContext().unset(); }}Copy the code
init

com.netflix.zuul.ZuulRunner#init

public void init(HttpServletRequest servletRequest, HttpServletResponse servletResponse) {

        RequestContext ctx = RequestContext.getCurrentContext();
        if (bufferRequests) {
            Threadlocal-based RequestContext is used to store request and response information to ensure that it is only valid for the current thread
            ctx.setRequest(new HttpServletRequestWrapper(servletRequest));
        } else {
            ctx.setRequest(servletRequest);
        }

        ctx.setResponse(new HttpServletResponseWrapper(servletResponse));
    }
Copy the code

com.netflix.zuul.context.RequestContext#getCurrentContext

   public static RequestContext getCurrentContext(a) {
        if(testContext ! =null) return testContext;

        RequestContext context = threadLocal.get();
        return context;
    }

	 protected static final ThreadLocal<? extends RequestContext> threadLocal = new ThreadLocal<RequestContext>() {
        @Override
        protected RequestContext initialValue(a) {
            try {
                return contextClass.newInstance();
            } catch (Throwable e) {
                throw newRuntimeException(e); }}};Copy the code

Spring Transaction Management

  • org.springframework.transaction.interceptor.TransactionInterceptor

TransactionInterceptor

	@Override
	@Nullable
	public Object invoke(MethodInvocation invocation) throws Throwable {
		// Work out the target class: may be {@code null}.
		// The TransactionAttributeSource should be passed the target class
		// as well as the method, which may be from an interface.Class<? > targetClass = (invocation.getThis() ! =null ? AopUtils.getTargetClass(invocation.getThis()) : null);

		// Adapt to TransactionAspectSupport's invokeWithinTransaction...
		return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
	}
Copy the code
TransactionAspectSupport#invokeWithinTransaction
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Copy the code
  • createTransactionIfNecessary

    • TransactionAspectSupport#prepareTransactionInfo

      	TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
      		if(txAttr ! =null) {
      			// We need a transaction for this method...
      			if (logger.isTraceEnabled()) {
      				logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
      			}
      			// The transaction manager will flag an error if an incompatible tx already exists.
      			txInfo.newTransactionStatus(status);
      		}
      		else {
      			// The TransactionInfo.hasTransaction() method will return false. We created it only
      			// to preserve the integrity of the ThreadLocal stack maintained in this class.
      			if (logger.isTraceEnabled()) {
      				logger.trace("Don't need to create transaction for [" + joinpointIdentification +
      						"]: This method isn't transactional."); }}// We always bind the TransactionInfo to the thread, even if we didn't create
      		// a new transaction here. This guarantees that the TransactionInfo stack
      		// will be managed correctly even if no transaction was created by this aspect.
      		txInfo.bindToThread();
      		return txInfo;
      Copy the code

      txInfo.bindToThread()

      org.springframework.transaction.interceptor.TransactionAspectSupport.TransactionInfo#bindToThread

      		private void bindToThread(a) {
      			// Expose current TransactionStatus, preserving any existing TransactionStatus
      			// for restoration after this transaction is complete.
      			this.oldTransactionInfo = transactionInfoHolder.get();
      			transactionInfoHolder.set(this);
      		}
      Copy the code
      	private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
      			new NamedThreadLocal<>("Current aspect-driven transaction");
      Copy the code

      Spring’s transaction management consists of composing transaction information into a TransactionInfo object,

      We then store this information in the ThreadLocal of the current thread, so that future transactions are committed and rolled back from the current thread.

      This also shows that spring transaction management can not be performed across threads, otherwise the corresponding transaction information will not be found, and finally spring transaction manager after a transaction, the current thread of the transaction information will be cleaned up

      TransactionAspectSupport#invokeWithinTransaction

      • cleanupTransactionInfo(txInfo);

        	protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) {
        		if(txInfo ! =null) { txInfo.restoreThreadLocalStatus(); }}private void restoreThreadLocalStatus(a) {
        		// Use stack to restore old transaction TransactionInfo.
        		// Will be null if none was set.
        		transactionInfoHolder.set(this.oldTransactionInfo);
        	}
        Copy the code

Week 9

Serialization thread closure mode

Thread closure means that objects are accessed only within the same thread and not shared with other threads. The technique that goes by that effect, called Thread confinement, is a common thread-safe design strategy.

Week10

Master-slave mode

The main purpose is to introduce fork join

At its core is a design model based on the idea of divide and conquer.

One task (list of all vaccinated personnel) is divided into several equal sub-tasks (list of all vaccinated personnel in each department), and these sub-tasks are executed in parallel by a dedicated worker thread (HR of each department).

The task results assigned to the group HR by superior departments are formed by integrating the processing results of various sub-tasks.

Moreover, the processing details related to splitting and summarizing are not visible to the higher authorities. This working mode improves the efficiency and realizes the hiding of details.

Division of the Master

Master: split the original task, distribute the sub-task, and summarize the results of the sub-task. The main methods are as follows:

  • Analysis: This is the method exposed by the Master to receive the original task and return the result of the processing

  • CreateAndStartWorkers: Create and start Slave, waiting for the Master to assign a task

  • DispatchTask: dispatchTask is split and assigned to each Slave for execution

  • Gatherer: Collects and combines the processing results of each sub-task to form the processing results of the original task.

Slave labor division

Responsible for sub-task processing

ForkJoinPool

java.util.concurrent.ForkJoinPool

Since JDK 1.7

Divide and conquer

  1. Task decomposition
  2. Parallel execution
  3. Logical recursion
  4. Results the aggregation

A constructor


    /**
     * Creates a {@code ForkJoinPool} with the given parameters, without
     * any security checks or parameter validation.  Invoked directly by
     * makeCommonPool.
     */
    private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }
Copy the code
  • Parallelism: The number of parallel threads allowed to execute simultaneously. The default is the number of CPU cores
  • Mode: indicates the working mode of the task queue. The value can be FIFO or LIFO. The default value is LIFO

Task submitted

java.util.concurrent.ForkJoinPool#submit(java.lang.Runnable)

The underlying implementation: Java. Util. Concurrent. ForkJoinPool# externalSubmit

ForkJoinPool#externalSubmit(ForkJoinTask)
  • Instantiation workQueues

    volatile WorkQueue[] workQueues;

  • Create a workQueque without attached threads associated with Pointers in workQueues and queue tasks. Increment the top pointer of workQueque by 1

  • Create a ForkJoinWorkerThread instance and bind it to a new workQueque (this workQueque has no data) and call its start method.

ForkJoinWorkerThread

runWorker(WorkQueue)
    final void runWorker(WorkQueue w) {
        w.growArray();                   // allocate queue
        int seed = w.hint;               // initially holds randomization hint
        int r = (seed == 0)?1 : seed;  // avoid 0 for xorShift
        for(ForkJoinTask<? > t;;) {if((t = scan(w, r)) ! =null)
                w.runTask(t);
            else if(! awaitWork(w, r))break;
            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift}}Copy the code

The core mechanism is the double-ended queue that provides the stealing function. Because of the stealing mechanism, thread resources are greatly utilized and no idle resources exist. In addition, through the implementation of double-endian queue, out-queue and in-queue, lock contention is avoided and thread safety is ensured

Week11

pipeline

Pipeline:

The Java 8 Stream API is an implementation of the Pipeline pattern.

Week12

Semi-synchronous and semi-asynchronous mode

Semi-synchronous and semi-asynchronous: half-sync/half-async

AsyncTask: Receives user requests and puts the logic to be executed in a queue, that is, asynchronizes user requests.

SyncTask: Execution logic used to process user requests.

Queue: Buffers user requests and ensures that they are executed sequentially