Elegant use of thread pools and source code analysis

background

Today is 10.24, the weather is fine, you’re fishing, indirectly to a demand suddenly, due to system upgrades, said to synchronize data, the plan is to put the old system need to synchronize the data (orders) sent to the MQ, the new system to pull the MQ, very simple solution, you have to do is, to the old system data encapsulation as MQ messages, call sends the MQ interface, Send to MQ.

Start thinking about

You now have an interface for sending MQ messages, and call the interface once to send an MQ message. When do you call this interface? Now the old system has 10,000 pieces of data, so I need to call this interface 10,000 times for the for loop. It costs 600ms to call this interface once, and 600ms * 10000 = 6000s = 600min = 10h to call this interface for 10,000 times

The smile changes gradually

Thought, good, so THAT I can touch the fish for 10 hours, today’s cloud panel on the first arrangement!

Boss, it is not that I do not work, but the program it is useless, you see, I have written the code, is he insisted to run 10 hours! Undeserved, boss

To rethink

By leading the conversation later, back to the station started thinking, what is the problem that the code or your problem, now there are 10000 data, and you are serial port calls, can you make it parallel call, I heard someone called the appearance of the thread pool is very severe, heard that the next group of the little hero has recently been studying the, I ask (force) q (force) him,

  • Good thing you’re a doer. Just do it!

Beginner edition

You came to xiaojie’s station, found that he is also using for loop fish……

“With thread pools? Sure! The following is the true name of the Executors tool: * We’ll just specify how many threads to create. The code looks like this.”

“This is so convenient.”

“But then there are a couple of problems.”

insufficient

1. You cannot customize parameters

You see, so far, we have only specified one parameter, but the thread pool actually has seven parameters, we can not control the other parameters, this is very inconvenient, but also does not help us understand the principle of the thread pool, the problem is not easy to troubleshoot

2. Easy to OOM

The thread pool contains a queue of tasks to be executed. If we use this thread pool, the size of its LinkedBlockingQueue is nearly infinite

This means that in theory, we can put tasks in the queue indefinitely, but our memory is limited, and we might get OOM when we heap tasks

“How can we customize the parameters? How to avoid OOM? “

“Let’s just use a constructor that takes parameters and size the blocking queue.”

  • In summary, you control the seven parameters of the thread pool according to your business needs

Platinum diamond edition

If we wanted to customize these parameters, we would use ThreadPoolExecutor’s native 7-parameter constructor, which would look something like this. From my observations, this is usually enough for the business in our projects

Let’s review what these seven parameters do

Parameters,

  • 1. CorePoolSize: number of core threads
  • 2. MaximumPoolSize: indicates the maximum number of threads
  • KeepAliveTime: indicates the maximum idle time of a non-core thread
  • 4. Unit: Spare time
  • 5. WorkQueue: A task queue storing unfinished tasks
  • ThreadFactory (customizable) : a threadFactory used to produce threads
  • 7. RejectedExecutionHandler (customizable) : refuse strategy: has reached the maximum number of threads and blocking queue is full, this time also to tasks, then using denial strategy refused to the back of the task

The overall process

insufficient

There is no way to monitor the specific health of each thread

Not found, it can run out the corresponding results, but I always feel a bit strange, standing in a spectator’s point of view, there are a bunch of tasks, I put them in the thread pool, can have a bunch of threads to deal with it, but I also don’t know the specific thread processing, also don’t know how much is the running time of each task corresponding thread, Can I print custom logs before and after each task? I can’t do any of that. I’m tied up

No exception handling

So many tasks, what if one of them is running and there is something abnormal? What is the exception carding mechanism for thread pools? I want to record the exception log how to record?

As long as the mind does not slide, the method is more than difficult!

King of Anarchy

We think of the problem, the predecessors must have thought of ah, otherwise this thread pool can be widely used in a variety of business scenarios, don’t you think? All of this inherits from ThreadPoolExecutor, overriding several methods of the parent class

Expand the interface

  • Before task execution
protected void beforeExecute(Thread t, Runnable r) {}Copy the code
  • Description The task is executed/an exception occurs
    protected void afterExecute(Runnable r, Throwable t) {}Copy the code
  • After the thread pool ends
    protected void terminated(a) {}Copy the code

Custom thread pools

   
public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    public static Logger log = LoggerFactory.getLogger(MyThreadPoolExecutor .class);

    private AtomicLong numTasks = new AtomicLong();
    private AtomicLong totalTime = new AtomicLong();
    private ThreadLocal<Long> startTime = new ThreadLocal<Long>();

    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        log.info("Thread {},Runnable {}",r.toString(),r.toString());
        startTime.set(System.currentTimeMillis());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        numTasks.incrementAndGet();
        long taskTime = System.currentTimeMillis() - startTime.get();
        totalTime.addAndGet(taskTime);
        log.info("Runnable {}, the execution time of this task is {}, the total time so far is {}, exception is {}", r.toString(),taskTime,totalTime.get(),t.getMessage());

    }

    @Override
    protected void terminated(a) {
        log.info("The total time to stop the terminated thread is {}, the total number of tasks is {}, and the average time is {}.",totalTime.get(),numTasks.get(),totalTime.get()/numTasks.get());

    }

    @Override
    public void execute(Runnable command) {
        super.execute(command); }}Copy the code

In this way, we can output the desired business log before and after the task execution/record time and other related information

Thread pool configuration class


@Configuration
public class MyThreadPoolExecutorConfig {

    public static Logger log = LoggerFactory.getLogger(MyThreadPoolExecutorConfig.class);

    @Value("${threadPoolExecutorConfig.corePoolSize}")
    private int corePoolSize;

    @Value("${threadPoolExecutorConfig.maximumPoolSize}")
    private int maximumPoolSize;

    @Value("${threadPoolExecutorConfig.keepAliveTime}")
    private int keepAliveTime;

    @Value("${threadPoolExecutorConfig.capacity}")
    private int capacity;

    @Value("${threadPoolExecutorConfig.namePrefix}")
    private String namePrefix;

    class MyThreadPoolFactory implements  ThreadFactory{
        private  final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private finalString namePrefix; MyThreadPoolFactory(String name) { SecurityManager s = System.getSecurityManager(); group = (s ! =null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup();if (null == name || name.isEmpty()) {
                name = "pool";
            }
            namePrefix = name + "-thread-" + poolNumber.getAndIncrement();
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            if (t.isDaemon()){
                t.setDaemon(false);
            }

            if(t.getPriority() ! = Thread.NORM_PRIORITY){ t.setPriority(Thread.NORM_PRIORITY); }returnt; }}class MyRejectedExecutionHandler implements RejectedExecutionHandler{

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            // Prints rejected tasks
            log.info("Task rejected {},CompletedTaskCount Approximate total number of completed tasks {},ActiveCount approximate number of threads that are actively executing tasks {}," +
                            "CorePoolSize Number of core threads {},LargestPoolSize Maximum number of threads that have ever entered the pool at the same time {},MaximumPoolSize maximum number of threads allowed {}," +
                            "QueueSize{},TaskCount Approximate total number of scheduled tasks {}",r.toString(),executor.getCompletedTaskCount(), executor.getActiveCount(),executor.getCorePoolSize(),executor.getLargestPoolSize(),executor.getMaximumPoolSize(), executor.getQueue().size(),executor.getTaskCount()); }}@Bean
    public ExecutorService getThreadPool(a){
        log.info("Initialize thread pool corePoolSize {},maximumPoolSize{},keepAliveTime {}, Capacity {},namePrefix{}",corePoolSize,maximumPoolSize,keepAliveTime,capacity,namePrefix);
        return  new MyThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit.SECONDS,
                      new ArrayBlockingQueue<>(capacity),new MyThreadPoolFactory(namePrefix),newMyRejectedExecutionHandler()); }}Copy the code

As mentioned earlier, the thread pool factory/reject policy can be customized, and this is what it looks like after customization. Besides, we usually like to extract the important parameters and write them in the configuration file for easy review and modification

The test code

To test the custom handling exception, we changed the test code to have the thread handling the order number orderNo as “order_1” throw an exception

  • The exceptions caught are as follows, and you can handle them yourself

Source code analysis

“Great! So I can transform a thread pool of my own! But I have a question. What does this thread pool work on? Why is thread reuse possible? Why did overriding the three methods achieve this effect? Is there some other secret thread pool hiding from me? Jacky, say it! “

I’m not going to pretend. I don’t know why

But before the source code, no secret

An operation

Thread pool state and number of threads

  • The thread pool manages both its state and the number of threads through a variable CALLED CTL

This is the big guy’s design idea, very clever, there is also a calculation of the relevant knowledge

Data structure and algorithm reflected here incisively and vividly, if the foundation is not solid, it is quite laborious to understand it, visible its importance

Let’s take a look at each of these and see what they are. How come you haven’t seen them in CRUD

Thread pool state

As shown in the figure, there are five states of the thread pool

  • RUNNING: Can accept new tasks and process tasks in a blocking queue
  • SHUTDOWN: After the SHUTDOWN () method is called, the thread pool cannot accept new tasks, but can process tasks in the blocking queue
  • STOP: After the shutdownNow() method is called, the thread pool cannot accept a new task and terminates the task being processed
  • TIDYING: The thread pool is in this state when the task in the blocking queue is empty and the number of threads in the pool is empty
  • TERMINATED: State of complete termination of the thread pool. The thread pool is TERMINATED from TIDYING to this state

The left shift of bit operations

  • Integer.SIZE: 32 (Integer is 32-bit)

  • COUNT_BITS: 32-3 = 29

  • 1 << 29 represents a binary 1 shifted 29 bits to the left

According to the observation, we can know that the three digits of the five states of the thread pool are inconsistent, respectively

  • RUNNING : 111
  • SHUTDOWN: 000
  • STOP: 001
  • TIDYING: 010
  • TERMINATED: 011

Remember the initial COUNT_BITS = integer.size-3?

  • Why negative 3 instead of negative 2 or negative 4? Because those are the five states that need to be represented by the first three

Now, this one moves to the left, how exactly does it move? Take, for example, 1<<29

This is a positive shift to the left, but what about a negative shift to the left?

Negative shift to the left

First, we need to know that negative numbers are represented in computers as their positive complement

  • Source code: Absolute value of a negative number
  • Inverse code: take the inverse of the source code
  • Complement: inverse +1

Back to what we were saying: -1 << 29

The shift to the left is the addition of a 0 to the right

The and operation or the inverse operation of a bitwise operation

We’re done with bit shift, so let’s look at this one again

  • Gets the thread pool state
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
Copy the code
  • Gets the number of threads in the thread pool
    private static int workerCountOf(int c)  { return c & CAPACITY; }
Copy the code
  • Initialize the CTL
    private static int ctlOf(int rs, int wc) { return rs | wc; }
Copy the code

What does that mean?

  • ~CAPACITY: indicates that the binary value of CAPACITY is reversed. The binary value of CAPACITY is changed from 0 to 1 and from 1 to 0

  • C & CAPACITY: indicates and operation. If the value is different, set it to 0

  • Rs | wc: representative or operation, one is 1

  • This is the bit operation in the thread pool

Execute source code

public void execute(Runnable command) {
        // Check whether the submitted task is empty
        if (command == null)
            throw new NullPointerException();
        // Get the CTL of the thread pool (depending on which to get the state of the thread pool or the number of thread pools)
        int c = ctl.get();
        // Check whether the number of thread pools is smaller than the number of core threads
        if (workerCountOf(c) < corePoolSize) {
            // If less than the number of core threads
            / / add the worker
            if (addWorker(command, true))
                // Add success, return
                return;
            // Failed to add CTL
            c = ctl.get();
        }
        // There are two reasons for this if
        // 1. The number of threads in the thread pool is greater than or equal to the number of core threads
        // 2. Failed to add the worker
        // When the thread pool is in the Running state, the task is added to the blocking queue, if added successfully
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // Check whether the thread pool state is running again, since it takes time to add tasks and the state of the thread pool may change during this time
            // If it is not running, the task is removed from the blocking queue and the reject policy is executed
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // When it comes to the if, there are two cases
            // 1. The thread pool status is runnning
            //2. The thread pool is not runnign, but failed to be deleted
            // Determine whether the current thread pool has threads, if not
            else if (workerCountOf(recheck) == 0)
                // Add a thread to ensure that there are threads in the thread pool to perform tasks
                addWorker(null.false);
        }
        // There are two reasons for this if
        // 1. Failed to add blocking queue.
        // 2. The thread pool is not runninng
        else if(! addWorker(command,false))
            // Fail to create a non-core thread, execute the reject policy
            reject(command);
    }
Copy the code

The flowchart is roughly as shown in the figure below

AddWorker source code analysis

private boolean addWorker(Runnable firstTask, boolean core) {
      // flag bit, will jump over later
        retry:
        for (;;) {
            int c = ctl.get();
            // Get the thread pool state
            int rs = runStateOf(c);
             // 1. If the thread pool is not in the running state
            / / and
            // 2. No (the thread pool is SHUTDOWN and firstTask is empty and the blocking queue is not empty)
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null&&! workQueue.isEmpty()))return false;

            for (;;) {
                // Determine the total number of threads in the pool
                int wc = workerCountOf(c);
                // If wc is greater than CAPACITY or
                // When core is true, wc is greater than corePoolSize
                // When core is false, wc is greater than maximumPoolSize
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    // If the number of WCS exceeds the limit, the add fails
                    return false;
                // The number of CAS thread pools increases by 1
                if (compareAndIncrementWorkerCount(c))
                    // If successful, jump to start retry and proceed without entering a major loop
                    break retry;
                // Cas failed
                The number of threads has changed. The state of the thread pool has changed. No more tasks can be added
               // Retrieve the CTL again
                c = ctl.get();  // Re-read ctl
                // Check whether the current thread pool state is the same as the previous state, if not
                if(runStateOf(c) ! = rs)// Continue the big loop
                    continueretry; }}// The thread started successfully
        boolean workerStarted = false;
        // The thread adds a successful flag
        boolean workerAdded = false;
        Worker w = null;
        try {
            // Encapsulate the submitted task into the worker
            w = new Worker(firstTask);
            // Get the thread in the worker
            final Thread t = w.thread;
            if(t ! =null) {
                final ReentrantLock mainLock = this.mainLock;
                / / lock
                mainLock.lock();
                try {
                    // Get the thread pool state
                    int rs = runStateOf(ctl.get());
                    // 1. If the thread pool is in the running state
                    // 2. Or the thread pool state is SHUTDOWN and firstTask is empty
                    // Thread can be added
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive())// Pre-check whether t can be started
                            throw new IllegalThreadStateException();
                        // Add worker to workers (this is a set, a real thread pool)
                        workers.add(w);
                        // Determine the number of threads
                        int s = workers.size();
                        // Update the maximum number of threads
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // Succeeded in adding the worker
                        workerAdded = true; }}finally {
                    / / releases the lock
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // Start the thread in worker
                    t.start();
                    workerStarted = true; }}}finally {
            // If the startup fails
            if (! workerStarted)
                addWorkerFailed(w);
        }
        // Returns whether the startup was successful
        return workerStarted;
    }
Copy the code

The flow chart of addWorker

The worker source

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {

        private static final long serialVersionUID = 6138294804551838833L;

        / / thread
        final Thread thread;
        // The task to be performed
        Runnable firstTask;
        // Completed task
        volatile long completedTasks;


        Worker(Runnable firstTask) {
            // Disable interrupts until runWorker
            setState(-1); // inhibit interrupts until runWorker
            // Tasks submitted by users
            this.firstTask = firstTask;
            By creating a thread, the woker worker passes in this that inherits Runnable, and the thread calls the run() method overridden by the t.start thread
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker */
        // The method to actually run the task
        public void run(a) {
            runWorker(this);
        }
        // 0 indicates the unlock status
        // 1 indicates the lock status
        protected boolean isHeldExclusively(a) {
            returngetState() ! =0;
        }

        protected boolean tryAcquire(int unused) {
            // If the lock is acquired successfully, cas sets state to 0 to 1
            if (compareAndSetState(0.1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
       // Release the lock and set state to 0
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock(a)        { acquire(1); }
        public boolean tryLock(a)  { return tryAcquire(1); }
        public void unlock(a)      { release(1); }
        public boolean isLocked(a) { return isHeldExclusively(); }

        void interruptIfStarted(a) {
            Thread t;
            // State is 1 and the current thread t is not empty and t is not interrupted
            if (getState() >= 0&& (t = thread) ! =null && !t.isInterrupted()) {
                try {
                    // Interrupt the thread
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
Copy the code

RunWorker source code parsing

final void runWorker(Worker w) {
        // Get the current thread
        Thread wt = Thread.currentThread();
        // This is the task we submitted
        Runnable task = w.firstTask;
        w.firstTask = null;
        // Set sate to 0 in response to interrupts
        w.unlock(); // allow interrupts
        // The thread exits
        boolean completedAbruptly = true;
        try {
            // The thread reuse password is here, a while loop that determines if the submitted task is not empty or there are tasks in the queue
            while(task ! =null|| (task = getTask()) ! =null) {
               / / lock
                w.lock();
                // Determine whether the interrupt flag needs to be set
                // 1. When the thread pool state is greater than or equal to STOP
                // 2. Set the interrupt flow when the thread pool state is less than STOP but the thread has been interrupted and wt is not interrupted
                if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                    // The user can extend the function before executing it
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // The task's own run method
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // After the function is executed, the user can expandafterExecute(task, thrown); }}finally {
                    task = null;
                    // Number of completed tasks +1
                    w.completedTasks++;
                    / / releases the lockw.unlock(); }}// The thread exits normally
            completedAbruptly = false;
        } finally {
            // Destroy the threadprocessWorkerExit(w, completedAbruptly); }}Copy the code

The flow chart of runWorker

GetTask source code analysis

private Runnable getTask(a) {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            / / for CTL
            int c = ctl.get();
            // Get the thread pool state
            int rs = runStateOf(c);
            // The state is greater than or equal to SHUTDOWN and (the state is greater than or equal to stop or the blocking queue is empty)
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // Number of threads -1
                decrementWorkerCount();
                return null;
            }
            // Get the number of thread pool threads
            int wc = workerCountOf(c);
            AllowCoreThreadTimeOut The default value is false
            // Check whether the number of thread pools is greater than the number of core threads. If yes, timed is true
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 1. Wc > the maximum number of threads or blocking queue pull timeout is true
            // 2. The number of threads is greater than 1 or the blocking queue is empty
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                // cas reduces the number of threads by one
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // If timed is true, obtain the element with a timeout, otherwise, always block
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // Get the task and return
                if(r ! =null)
                    return r;
                TimedOut is set to true
                timedOut = true;
            } catch (InterruptedException retry) {
                // The task thread is interrupted. Try again
                timedOut = false; }}}Copy the code

The flow chart of getTask

ProcessWorkerExit source code analysis

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        //completedAbruptly = true indicates that the thread execution is abnormal
        // completedAbruptly = tfalse The thread ends normally
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            // The number of threads is reduced by one
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        / / lock
        mainLock.lock();
        try {
            // Number of tasks completed
            completedTaskCount += w.completedTasks;
            // Delete the woker from the thread collection
            workers.remove(w);
        } finally {
            / / releases the lock
            mainLock.unlock();
        }
        // Determine whether the thread pool needs to be terminated
        tryTerminate();
        / / get the CTL
        int c = ctl.get();
        // The thread pool state is running
        if (runStateLessThan(c, STOP)) {
            // If the thread is not terminated abnormally
            if(! completedAbruptly) {// Min = 0 if core thread timeout is allowed to cancel
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // If min = 0 and there are still tasks in the blocking queue, reserve a thread for processing
                if (min == 0&&! workQueue.isEmpty()) min =1;If (workerCountOf(c) >= min) return; // replacement not needed} // addWorker addWorker(null, false); }}Copy the code

The flow chart of processWorkerExit

The last

It is we use the thread pool when submitting the overall process of the execute method, actually many, many things to put in, have to admire the designers subtle thoughts, however, this is not the whole thread pool, and two close the method of the thread pool, and abnormal comb not conducted detailed said, why the thread pool sometimes swallow exception? What are the methods for thread pools to handle exceptions? What’s the difference between a Submit method and an execute method

Little research

Do you like to talk directly about the source code? Or do you prefer the type with the opening scene? If you are interested in the second half of the source code

  • Set a flag, this article likes to break 10, xiaojie will continue to this thread pool article did not talk about the source code overtime drive out!