Back in 2003, I was a little-known programmer, but my superiors looked up to me and gave me the entire concurrency module to develop on my own. (Isn’t it because of funding?)

This week, I had to complete a very important feature in the concurrency module — thread pooling.

Why use thread pools

As a qualified programmer, when I receive a requirement, THE first question I have to ask myself is: Why do I need to do this requirement? Why do we need thread pools?

As another newbie put it on his blog more than a decade later:

The “pool” in software can be understood as a factory in the era of planned economy.

First of all, as a factory, you should take good care of what you produce. Lao Wang took an axe from your factory. If he doesn’t need it any more, he can return it and you can lend it to Lao Zhao.

Secondly, you can’t produce without limit. After all, in an era of extreme scarcity of resources, if you use all of them to produce, what about other places to use resources?

Summed up in two points, “pool” function:

  • Reusing existing resources
  • Controlling total Resources

This is true for database connection pools, and it is true for thread pools.

You a task over, I found the pool has nothing to do and still alive thread, come, take to use, I also don’t bother to create a thread for you, to know that thread creation and destruction can be troublesome; You a task over, I found that the pool threads are busy, and now the pool threads have too much, do not limit down will overflow memory, come, queue ~

What thread pools need to be considered

Simple architectures are easy to implement, but they don’t solve complex problems; Complex architectures can solve complex problems, but are not so easy to implement.

Before I get into thread pooling, let’s take a look at what my thread pool, ThreadPoolExecutor, looks like:

Image source: http://tutorials.jenkov.com/java-util-concurrent/threadpoolexecutor.html

You can start by looking at this diagram and thinking about what are the nodes in this diagram and why do you need them?

Ok, so let’s talk about the considerations that go into implementing a thread pool.

If each incoming task is handed over directly to the thread, it is not really decoupling.

It is better to queue the task first and then fetch the task from the queue when the thread is idle. In order to create a block when fetching, I chose to use BlockingQueue to hold the unprocessed tasks.

If you have used RabbitMQ, Kafka and other messaging middleware, you will see that they work in a similar way to blocking queues.

There are many types of blocking queues:

  • Unbounded blocking queues, such as linkedBlockingQueues, that let as many tasks go;
  • Bounded blocking queues, such as ArrayBlockingQueue;
  • Direct handoffs, like SynchronousQueue, whose PUT method blocks until a thread is ready to take it from the Queue, so SynchronousQueue is not a Queue at all, it doesn’t store anything, it’s just handing something over

Each of these queues has its pros and cons, so it is best to put the task queue parameters in the constructor and provide them to the person using the thread pool to set.

I define two variables for the number of threads, corePoolSize and maximumPoolSize. The difference between these two parameters can be explained as follows:

  • When the number of threads in the thread pool is less than corePoolSize, I create a thread to handle each incoming task, regardless of whether there are idle threads in the thread pool.
  • When the number of threads in the thread pool reaches corePoolSize, new tasks are placed in the task queue first.
  • What happens when the task queue is full (if the queue is bounded)? Reject a new assignment right away? It doesn’t seem right. In the face of this sudden rush of business, can I make an exception and create a few more threads? So maximumPoolSize, if the task queue is full but there are fewer threads in the thread pool than maximumPoolSize, THEN I allow the thread pool to continue to create threads. This is like a table in a sausage shop that starts with ten, and when the midday rush comes, it’s not enough. The owner’s wife lets small 2 take out a few tables from the kitchen again to come out same.

Similarly, these two parameters should be placed in the constructor and the user can decide how large the thread pool should be based on the actual situation.

例 句 : Are the tables from the kitchen gradually withdrawn after the rush hour? Similarly, when I find that the number of threads in the thread pool exceeds corePoolSize, I will monitor the threads and find that a thread has not been working for a long time, and I will close it, depending on the keepAliveTime that you pass in. If you want to also do this kind of monitoring for corePoolSize thread, just call threadPoolExecutor. AllowCoreThreadTimeOut (true).

You might be wondering how I determine how long a thread has been inactive, if I think I’m going to start a monitoring thread to see which thread is slacking off? If a thread does not poll a task within keepAliveTime, then I consider the thread to be idle and can be killed, as you can see from this code snippet.

ThreadPoolExecutor getTask():

private Runnable getTask() { boolean timedOut = false; for (;;) {... try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }}}Copy the code

If the thread pool has been shutdown, or the thread pool is using a bounded queue, and the queue is full, and the number of threads has reached the maximum number, no new threads can be created to handle the request, what happens to the incoming task? After talking with you, we think there are at least four strategies:

  • AbortPolicy: thread pool using this strategy, will be unable to continue to accept the new task, to submit tasks party thrown RejectedExecutionException, let them decide how to deal with;
  • CallerRunsPolicy: This policy, as the name suggests, will delegate tasks to the caller’s thread for execution;
  • DiscardPolicy: Directly discards the new task.
  • DiscardOldestPolicy: Discard the oldest task in blockingQueue.poll(). Note that if you use the PriorityBlockingQueue as your task queue, this policy will discard the highest priority task, so in general, PriorityBlockingQueue and DiscardOldestPolicy will not be used together

Speaking of strategy, you might expect me to use strategy. You guessed it, I used the policy mode, which is so simple that I only need to define a policy interface, RejectedExecutionHandler:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
Copy the code

DiscardPolicy: DiscardPolicy: DiscardPolicy: DiscardPolicy: DiscardPolicy: DiscardPolicy: DiscardPolicy: DiscardPolicy: DiscardPolicy

 public static class DiscardPolicy implements RejectedExecutionHandler     {
        public DiscardPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
Copy the code

Then in the constructor, I have you pass me the strategy you want to use, and finally in my reject() method, I call the strategy you passed,

 final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }
Copy the code

Why use final? Of course, I don’t want you to rewrite it, there’s no need to customize this method. You’ll see where the Reject method is called using IDEA, Alt + f7, and then you’ll see a lot of esoteric code THAT I’ve written, which I’m not going to go through in detail today, but I’m going to focus on the role of a few arguments in the constructor, which you can customize.

I made you good wheels

For your ease of use, I’ve written down the factory method for thread pools under Executors so that novicers don’t need to know much about ThreadPoolExecutor. They can just use my factory method directly to use thread pools:

1, newFixedThreadPool if you want to do a limit of the total number of threads in thread pool, then by Executors. NewFixedThreadPool (…). * If the thread pool is true or the thread pool is true, run the following command: * If the thread pool is true or the thread pool is true, run the following command: * If the thread pool is true or the thread pool is true, run the following command:

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
Copy the code

If you want to have a very flexible thread pool, use newCachedThreadPool:

 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
Copy the code

From the factory method above, you can see that CachedThreadPool is a ThreadPoolExecutor configured like this:

  • CorePoolSize: 0
  • MaxPoolSize: Integer. MAX_VALUE
  • KeepAliveTime: 60 s
  • workQueue: SynchronousQueue

Just as computers with different CPU and graphics card combinations can be used for different purposes (data analysis, gaming, video processing, etc.), different configurations of ThreadPoolExecutor can produce different power. These configurations of CachedThreadPool can produce different power:

  • For a new task, if there are no free threads in the thread pool, the pool will not hesitate to create a new thread to handle the task. Because corePoolSize is 0, the current thread count must be greater than or equal to corePoolSize, and the workQueue is SynchronousQueue, which, as we said, doesn’t hold anything, it just transitions, so you can assume that its queue is always full, Finally, maxPoolSize is infinite and no further creation will reach the maximum number of threads, so the thread pool will create a new thread to handle the task;
  • KeepAliveTime is 60 seconds, which you can consider to be the thread expiration time. A newly created thread will be destroyed if there is no task to execute for 60 seconds (cache hit). If the thread is allocated to a task within 60 seconds (cache hit), it can be used directly.
  • Another advantage of corePoolSize being set to 0 is that when no new tasks are received in the thread pool for a long period of time, threads in the thread pool are gradually destroyed until the number of threads in the thread pool drops to 0, so that the entire thread pool consumes no resources. This makes CachedThreadPool particularly suitable for periodic, short-lived tasks, such as a wave of business at midnight and little or no business at other times

Of course, one of the obvious pitfalls of CachedThreadPool is that it has an unmanageable number of threads. Now that you know the important parameters of ThreadPoolExecutor, you can create your own CachedThreadPool with a maximum number of threads. Or after CachedThreadPool is created, use the setMaximumPoolSize method to change the maximum number of threads.

3, newSingleThreadExecutor by analogy, it is easy to understand, here is not posted source code and explanation.

4, newScheduledThreadPool by association, it is a little difficult to understand, here will not post source code and explain.

conclusion

The ThreadPoolExecutor constructor is a ThreadPoolExecutor constructor that allows you to customize your own thread pool. The ThreadPoolExecutor constructor is a ThreadPoolExecutor constructor that allows you to customize your own thread pool. Of course, there are many complex mechanisms inside the thread pool, such as the management of various states and so on, but these are not externally customizable, which we will discuss later.

Afterword.

The clock hit 24 and I ran through all the ThreadPoolExecutor test cases, green bar, and passed them all. Was ready to submit the code, went home to sleep, suddenly found that the class has not written their name, so, pa pa, I in the head of the class, left my name……

To Doug Lea!

Welcome to Bridge4You!

Some things can only be said to you there (‘▽’)

reference

  • Java Concurrent Programming Practices
  • ThreadPoolExecutor (Java Platform SE 8 )
  • ThreadPoolExecutor jenkov.com
  • fixedthreadpool-vs-cachedthreadpool-the-lesser-of-two-evils