Author: small Fu Ge blog: https://bugstack.cn Github:https://github.com/fuzhengwei/CodeGuide/wiki

Precipitation, share, grow, let yourself and others can gain something! 😄

One, foreword

People watch phones, machine learning!

It happens to be 2020, so it’s kind of interesting to see this picture. When I was a child, I always saw some science and technology movies, talking about how robots would be, but I never thought that people seemed to be entertainment, made into a bow tribe, big belly!

When I realized this, I really missed my childhood. Holiday morning ran out, shouted three or five partners, under the river touch fish, play glass ball, play pia, jump house! After a day really will not feel tired, but now if it is a holiday day, your entertainment arrangements, a lot of times will let the head is very tired!

Like, have you ever had a headache studying English for a day, or scrolling douyin for a day? Or play games and ball all day! If you are aware of it, try to put down your phone for a while, have some fun, exercise and stay healthy!

2. Interview questions

Thanks for the plane, note! , last time suffered in the thread, this time not a pit off twice!

Thank plane: you ask, I am ready!!

Interviewer: Well, how is thread pool state designed to be stored?

Xie Airplane: Here! Next! Next!

Interviewer: Why not use ReentrantLock to implement Worker’s implementation class instead of inheriting AQS?

Xie Airplane: I… !

Interviewer: Can you briefly describe the process of execute?

Xie Airplane: Goodbye!

Thread pool explanation

1. Let’s start with an example

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10.10.0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10));
threadPoolExecutor.execute(() -> {
    System.out.println("Hi thread pool!");
});
threadPoolExecutor.shutdown();

// Executors.newFixedThreadPool(10);
// Executors.newCachedThreadPool();
// Executors.newScheduledThreadPool(10);
// Executors.newSingleThreadExecutor();
Copy the code

This is an example of creating a thread pool that you have used many times.

The core purpose of thread pools is resource utilization, avoiding the resource consumption caused by the repeated creation of threads. Therefore, the idea of a pooling technique is introduced to avoid the performance overhead of repeated creation and destruction.

So, let’s take a hands-on look at the pool construction and see how it handles threads.

2. Write a thread pool

2.1 Implementation Process

In order to better understand and analyze the source code of the thread pool, let’s first follow the idea of the thread pool, write a very simple thread pool.

In fact, many times the core logic of a piece of functional code may not be very complicated, but in order for the core process to run smoothly, additional branches of the auxiliary process need to be added. Like I always say, the butt wipe paper was made that big to protect my hands!

With regard to Figure 21-1, the implementation of the pen-based thread pool is also very simple, showing only the core processes, including:

  1. There are n threads running all the time, which is the size of the thread pool we allowed when we created it.
  2. Commit threads to a thread pool to run.
  3. If the running thread pool is full, the thread is queued.
  4. Finally, when there is idle, the thread in the queue is acquired to run.

2.2 Implementation Code

public class ThreadPoolTrader implements Executor {

    private final AtomicInteger ctl = new AtomicInteger(0);

    private volatile int corePoolSize;
    private volatile int maximumPoolSize;

    private final BlockingQueue<Runnable> workQueue;

    public ThreadPoolTrader(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue) {
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
    }

    @Override
    public void execute(Runnable command) {
        int c = ctl.get();
        if (c < corePoolSize) {
            if(! addWorker(command)) { reject(); }return;
        }
        if(! workQueue.offer(command)) {if(! addWorker(command)) { reject(); }}}private boolean addWorker(Runnable firstTask) {
        if (ctl.get() >= maximumPoolSize) return false;

        Worker worker = new Worker(firstTask);
        worker.thread.start();
        ctl.incrementAndGet();
        return true;
    }

    private final class Worker implements Runnable {

        final Thread thread;
        Runnable firstTask;

        public Worker(Runnable firstTask) {
            this.thread = new Thread(this);
            this.firstTask = firstTask;
        }

        @Override
        public void run(a) {
            Runnable task = firstTask;
            try {
                while(task ! =null|| (task = getTask()) ! =null) {
                    task.run();
                    if (ctl.get() > maximumPoolSize) {
                        break;
                    }
                    task = null; }}finally{ ctl.decrementAndGet(); }}private Runnable getTask(a) {
            for(; ;) {try {
                    System.out.println("WorkQueue. Size:" + workQueue.size());
                    return workQueue.take();
                } catch(InterruptedException e) { e.printStackTrace(); }}}}private void reject(a) {
        throw new RuntimeException("Error! CTL. The count," + ctl.get() + " workQueue.size:" + workQueue.size());
    }

    public static void main(String[] args) {
        ThreadPoolTrader threadPoolTrader = new ThreadPoolTrader(2.2.new ArrayBlockingQueue<Runnable>(10));

        for (int i = 0; i < 10; i++) {
            int finalI = i;
            threadPoolTrader.execute(() -> {
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task No. :"+ finalI); }); }}}// Test resultsTask No. :1Task No. :0WorkQueue. Size:8WorkQueue. Size:8Task No. :3WorkQueue. Size:6Task No. :2WorkQueue. Size:5Task No. :5WorkQueue. Size:4Task No. :4WorkQueue. Size:3Task No. :7WorkQueue. Size:2Task No. :6WorkQueue. Size:1Task No. :8Task No. :9WorkQueue. Size:0WorkQueue. Size:0
Copy the code

Above, the implementation of thread pool is still very simple, from the test results have been able to reflect the most core pooling ideas. The main functional logic includes:

  • ctlIs used to record the number of threads in the thread pool.
  • corePoolSize,maximumPoolSizeTo limit thread pool capacity.
  • workQueueThread pool queues, i.e. threads that cannot be run in time, are loaded into this queue.
  • executeFor submitting threads, this is the generic interface method. In this method, the main implementation is whether the current submitted thread is added to the worker, queue or abandoned.
  • addWorker, mainly classesWorkerTo create and execute a thread. And there are alsogetTask() Method, which continuously fetches unexecuted threads from the queue.

Ok, so this is the embodiment of this simple thread pool implementation. But if you think about it, you’ll see that there are a lot of improvements to be made. For example: Thread pool state, you can’t run all the time! ? What about thread pool locks and concurrency issues? , thread pool rejection policy? None of these issues are addressed in the main flow, and because there is no flow, the code above is easier to understand.

Next, we will start to analyze the thread pool source, compared to our implementation of a simple thread pool reference, it will be easier to understand 😄!

3. Thread pool source code analysis

3.1 Thread pool class diagram

The implementation and inheritance relationships between classes are centered around the implementation of the core class ThreadPoolExecutor, as shown in Figure 21-2.

  • interfaceExecutor,ExecutorService, the basic method for defining a thread pool. especially execute(Runnable command)Submit the thread pool method.
  • An abstract classAbstractExecutorService, the implementation of the basic universal interface methods.
  • ThreadPoolExecutorIs the core utility class method of the entire thread pool. All the other classes and interfaces provide functionality around this class.
  • WorkerIs the method of the task class, which is the thread that ultimately executes.
  • RejectedExecutionHandlerIs the rejection policy interface, which has four implementation classes;AbortPolicy(reject by throwing exceptions),DiscardPolicy(direct discard),DiscardOldestPolicy(Discarding the task with the longest lifetime),CallerRunsPolicy(whoever submits it executes it).
  • ExecutorsIs a thread pool for creating different policies that we use commonly,newFixedThreadPool,newCachedThreadPool,newScheduledThreadPool,newSingleThreadExecutor.

3.2 High 3 bits and low 29 bits

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
Copy the code

In the ThreadPoolExecutor thread pool implementation class, a CTL of type AtomicInteger is used to record the thread pool state and the number of thread pools. To record multiple values on a type, it uses split data area, high 3 bits record state, low 29 bits store thread number, default RUNNING state, thread number is 0.

3.2 Thread pool status

Figure 22-4 shows the state flow relationship in the thread pool, including the following states:

  • RUNNING: Running state, accepting new tasks and processing tasks in the queue.
  • SHUTDOWN: Closed state (shutdown method called). Does not accept new tasks, but processes tasks in the queue.
  • STOP: Stopped state (the shutdownNow method was called). No new tasks are accepted, no tasks in the queue are processed, and ongoing tasks are interrupted.
  • TIDYING: All tasks are terminated, workerCount is 0, and the thread pool terminated() method is called into terminated state.
  • TERMINATED: terminated state, state after the method call terminated.

3.3 Submit Thread (Execute)

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null.false);
    }
    else if(! addWorker(command,false))
        reject(command);
}
Copy the code

When reading this section of the source code, you can refer to our own implementation of the thread pool. In fact, the end goal is the same, that is, this section of the submitted thread, start execution, join the queue, decision policy, these three ways.

  • ctl.get(), takes a value that records the thread state and the number of threads, and ultimately needs to use methodsworkerCountOf()To get the current thread count.‘workerCountOf performs c & CAPACITY
  • Based on the number of threads in the current thread pool, and the number of core threadscorePoolSizeFor comparison, if the number is smaller than that, add the thread to the task execution queue.
  • If the number of threads is full at this point, you need to determine whether the thread pool is runningisRunning(c). If it is running, the thread that cannot be executed is placed in the thread queue.
  • After it is put into the thread queue, it is necessary to re-determine whether the thread is running and remove the operation. If it is not running and removed, it will reject the policy. Otherwise, the number of threads is 0 and a new thread is added.
  • Finally, it tries to add the task execution again. At this time, the second input parameter of the addWorker method is false, which will affect the judgment of the number of added tasks. If the policy fails to be added, the policy is rejected.

3.5 Adding an Execution Task (addWorker)

private boolean addWorker(Runnable firstTask, boolean core)

Part one: Increase the number of threads

retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);
    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null&&! workQueue.isEmpty()))return false;
    for (;;) {
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;
        if (compareAndIncrementWorkerCount(c))
            break retry;
        c = ctl.get();  // Re-read ctl
        if(runStateOf(c) ! = rs)continue retry;
        // else CAS failed due to workerCount change; retry inner loop}}Copy the code

The first part is to create a startup thread

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if(t ! =null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int rs = runStateOf(ctl.get());
            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true; }}finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            t.start();
            workerStarted = true; }}}finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;
Copy the code

The process of adding an execution task can be divided into two parts: the upper part of the code is used to record the number of threads, and the lower part of the code is used to create and start the execution thread in the exclusive lock. This part of the code does not look at locking, CAS, etc., so it is basically the same as our original handwritten thread pool

  • if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty())), checks whether the current thread pool state isSHUTDOWN,STOP,TIDYING,TERMINATEDOne of them. And the current state isSHUTDOWN, and the incoming task is null, and the queue is not empty. So return false.
  • compareAndIncrementWorkerCount, CAS operation, increase the number of threads, success will break out of the token body.
  • runStateOf(c) ! = rs, and finally, the thread pool state judgment to decide whether to loop or not.
  • After the number of thread pools is recorded successfully, it is necessary to enter the locking process, create the execution thread, and record the status. In the end, if the startup is not successful, you need to execute the addWorkerFailed method, remove to the thread method and other operations.

3.6 Execution Thread (runWorker)

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // Allow interrupts
    boolean completedAbruptly = true;
    try {
        while(task ! =null|| (task = getTask()) ! =null) 
            w.lock();
            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } finally{ afterExecute(task, thrown); }}finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally{ processWorkerExit(w, completedAbruptly); }}Copy the code

In fact, with the basics of a handwritten thread pool, this is basically what a thread pool does. The core point here is task.run() to get the thread running. Additional additional processes are as follows;

  • beforeExecute,afterExecuteDo some statistics before and after the thread executes.
  • In addition, the lock operation here is the non-reentrant exclusive lock that the Worker inherits from AQS.
  • processWorkerExitIf you are interested, similar methods can also be studied in depth.It is also interesting that workers do some removal processing and the number of completed tasks when the thread exits

3.7 Obtaining a Task from a Queue (getTask)

If you’ve already started reading the source code, you can see a loop like this in the runWorker method while (task! = null || (task = getTask()) ! = null). This is the same way we operate in a handwritten thread pool, with the core purpose of fetching thread methods from a queue.

private Runnable getTask(a) {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if(r ! =null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false; }}}Copy the code
  • The getTask method fetches the tasks waiting to be executed from the blocking queue, which is the thread fetching method.
  • if (rs >= SHUTDOWN ...To determine whether the thread is closed.
  • Wc = workerCountOf(c), wc > corePoolSizeIf the number of worker threads exceeds the number of core threadscorePoolSizeIf the workQueue is not empty, the worker thread is added. However, if the timeout does not fetch the thread, the thread larger than corePoolSize is destroyed.
  • timed, it isallowCoreThreadTimeOut. In the endtimedWhen true, timeout is controlled by the poll method of the blocking queue.
  • If thekeepAliveTimeIf no task is obtained within the specified time, null is returned. If false, it blocks.

Four,

  • This section does not cover all thread pools, or it would be a bit bloated. In this chapter, we start from the handwriting thread pool, and gradually analyze how these codes are implemented in the Java thread pool. The knowledge point is almost the content we introduced before, including: queue, CAS, AQS, reentrant lock, exclusive lock and so on. So the knowledge is pretty much interlinked, and it’s good to have some foundation or it’s a little hard to understand.
  • Beyond what we’ve covered in this chapter, we haven’t covered the thread destruction process, the selection and use of the four thread pool methods, or theCPU intensive task,IO intensive taskHow to configure. Spring also has its own thread pool implementation. These points are very close to the actual operation.
  • Ok, today’s content first pulled this, the follow-up content will be improved in succession. If the above content is typos, missing process, or difficult to understand and description error, welcome to leave a comment. Learn from each other and make progress together.

Five, series recommendation

  • Thread.start(), how does it start a Thread?
  • Thread: state conversion, method use, principle analysis
  • AQS principle analysis and practical use of ReentrantLock
  • What are double-endian queues, delayed columns, blocking queues, all knowledge blind!
  • How do you become an architect when 90% of programmers have never used multithreading and locking?