preface
This kind of eight-part essay I really embarrassed to write out, more embarrassed to write some sensationalist names… But interviews do ask, and if you’re interviewing for a college job, I recommend reading The Art of Concurrent Programming. You don’t need to read any other books on multithreading. This blog post will briefly cover the basics of thread pools, which handle the flow of tasks. If you are not interested in using thread pools, please ignore them. I’ll also share some of my shallow experiences at work and take a look at the open source framework NifTY (Thrift + Netty) and how Netty uses thread pools. That’s three parts.
- A brief introduction to thread pools
- Thread pools in nIFTY and NetTY
- Experience sharing
Thread Pool basics
The benefits of using thread pools
If the program needs to be executed asynchronously or concurrently, we will use thread pools a lot of times. So what are the benefits of using thread pools? Since there are already threads in the pool, tasks can be executed without creating threads, and can be continuously utilized without being destroyed. This has two benefits :(1) reduced resource overhead, (2) improved response time, and (3) easy monitoring and tuning.
In all likelihood, your toC system will monitor the thread pool. For example, if the number of threads has increased since you went live, you might have a problem this time and have to roll back if you can’t locate the problem immediately. If you don’t have a monitoring alert then you might have to figure out how to write an incident report when you run out of system resources and the service becomes unavailable.
Thread Pool Principle
Suppose you now submit a task to a thread pool and the thread pool processes the task as follows
- Check whether the current number of threads reaches corePoolsize. If not, a thread is created to execute the task; Otherwise, go to Step 2.
- Determine whether the task queue is full. If the task is not added to the task queue; Otherwise, go to Step 3.
- Check whether the current number of threads reaches the maximum number of threads in the thread pool maxPoolsize. If not, a thread is created to execute the task; Otherwise, the denial policy is executed.
Thread pool usage
Java thread poolExecutorService
, the inheritance relationship is as follows:
The two main implementation classes we use are ThreadPoolExecutor, the regular thread pool, which is the most used; The other is used for timing task thread pool ScheduledThreadPoolExecutor. We see ThreadPoolExecutor. The logic of the execute method to handle task (in fact, the principle of the thread pool puts it) :
- If fewer threads are currently running than corePoolSize, a new thread is created to execute the task;
- If the running thread is equal to or more than corePoolSize, add the task to BlockingQueue;
- If the task cannot be added to the BlockingQueue (the queue is full), a new thread is created to process the task;
- If you create a new thread will make the currently running thread beyond maximumPoolSize, task will be rejected, and call the RejectedExecutionHandler. RejectedExecution () method.
/ / TODO cato
Steps 2 and 4 require global locks, which can seriously affect performance. Therefore, after completing the warm-up, all subsequent tasks are put into the task queue to avoid obtaining the global lock.
Once the logic of processing tasks is introduced, the construction of a thread pool is straightforward.
public ThreadPoolExecutor( int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
Copy the code
- CorePoolSize: Number of core threads. When a task comes, regardless of whether there are currently idle threads, as long as the number of threads does not reach the value, threads will continue to be created to execute the task until the number of threads reaches the value, that is, the above mentioned warm-up;
- MaximumPoolSize: The maximum number of threads allowed to be created in the thread pool. When the queue is full and the number of threads in the pool is less than this value, threads will continue to be created to perform tasks.
- KeepAliveTime: the length of time an idle thread is allowed to be active before being destroyed.
- Unit: keepAliveTime unit;
- WorkQueue: a blocking queue
ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue
; - ThreadFactory: a threadFactory, once again used to give threads a meaningful name, as mentioned in the introduction to nifty, is built using guava’s ThreadFactoryBuilder;
- Handler: indicates the rejection policy, which is adopted when the number of threads reaches maximumPoolSize and new tasks cannot be executed. The common ones are: – AbortPolicy: directly throws an exception. – DiscardPolicy: does not process the exception and discards it.
Thread pool utility class
Thread pool tool: Executors
Below are some common methods for creating thread pools using the Executors tool class in business code
int num = Runtime.getRuntime().availableProcessors() * 2;
/ / (1)
Executors.newFixedThreadPool(num);
Executors.newFixedThreadPool(num, new ThreadFactoryBuilder().setNameFormat("fixed").build());
/ / (2)
Executors.newCachedThreadPool();
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("cache").build());
/ / (3)
Executors.newSingleThreadExecutor();
Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("single").build());
/ / (4)
Executors.newScheduledThreadPool(num);
Executors.newScheduledThreadPool(num, new ThreadFactoryBuilder().setNameFormat("schedule").build());
Copy the code
(1) newFixedThreadPool: a thread pool with a fixed number of threads, using an unbounded queue LinkedBlockingQueue. MaximumPoolSize, reject policy, and keepAliveTime are invalid due to the unbounded queue.
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Copy the code
(2) newCachedThreadPool: an unlimited number of thread pools with a maximum capacity of integer. MAX_VALUE means that this thread can create as many threads as possible to perform tasks. The SynchronousQueue, which has no capacity, performs the following three steps after the main thread commits a task:
- Synchronousqueue.offer (Runnable task) is executed first. Poll (keepAliveTime, timeunit.milliseconds) if an idle thread in maximumPool is executing synchronousQueue.poll (keepAliveTime, timeunit.milliseconds), then the main thread’s offer operation is matched to the poll operation performed by the idle thread. The main thread assigns the task to the idle thread, and the execute() method completes. Otherwise, go to Step 2.
- Synchronousqueue.poll (keepAliveTime, timeunit.milliseconds) will not be executed when the initial maximumPool is empty, or when there are currently no idle threads in the maximumPool. In this case, step 1 will fail. The CachedThreadPool creates a new thread to execute the task, and the execute() method completes.
- In Step 2), the newly created thread executes the task synchronousQueue.poll (keepAliveTime, timeunit.milliseconds). This poll operation causes idle threads to wait up to 60 seconds in the SynchronousQueue. If the main thread submits a new task within 60 seconds (main thread execution step 1), the idle thread will execute the new task submitted by the main thread. Otherwise, the idle thread will terminate. Since idle threads that are idle for 60 seconds are terminated, cachedThreadPools that remain idle for a long time do not use any resources.
public static ExecutorService newCachedThreadPool(a) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Copy the code
(3) newSingleThreadExecutor: same as newFixedThreadPool except that the number of fixed threads is 1.
(4) newScheduledThreadPool: create a thread pool for regularly executing tasks. Function is similar to Timer, but multiple threads can be set. The topic provides the following three methods:
publicScheduledFuture<? > scheduleAtFixedRate(Runnable command,long initialDelay,
long period,
TimeUnit unit);
publicScheduledFuture<? > scheduleWithFixedDelay(Runnable command,long initialDelay,
long delay,
TimeUnit unit);
Copy the code
These two methods indicate that the task is executed after the initialDelay is delayed, and then every period
publicScheduledFuture<? > schedule(Runnable command,long delay, TimeUnit unit);
Copy the code
This method is used to delay the execution of a task
ScheduledExecutorService service = Executors.newScheduledThreadPool(num, new ThreadFactoryBuilder().setNameFormat("schedule").build());
// Output "Hello world" after 3 seconds, every 1 second after that
service.scheduleWithFixedDelay(new Runnable() {
@Override
public void run(a) {
System.out.println("hello world"); }},3.1, TimeUnit.SECONDS);
// Output "hello world2" after 3 seconds, every 1 second after that
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
System.out.println("hello world2"); }},3.1, TimeUnit.SECONDS);
// Output "hello world3" after 3 seconds, no more output
service.schedule(new Runnable() {
@Override
public void run(a) {
System.out.println("hello world3"); }},1,TimeUnit.SECONDS);
Copy the code
Shutdown the thread pool: either shutdown or shutdownNow is used. The shutdown method is usually called to shutdown the thread pool, or shutdownNow can be called if the task is not expected to complete.
Use of thread pools in Nifty and Netty open source frameworks
Nifty
I’ve written several blog posts about nifty if you’re interested.
* Open RPC frameworks that use Thread pools * or customize parameters. On the Nifty(Thrift+Netty) server, the following thread pools are used to create BossPool and WorkerPool for Netty
acceptorExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-acceptor-%s").build());
ioExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-io-%s").build());
Copy the code
* * * provides the thread pool for Netty / * *.
NiftyDispatcher is a processor on the server side that processes the request as it is received. Thread pools are also used here. Each time a request is received, a task is constructed and sent to the thread pool to execute. The default is the thread pool created by the framework itself
new ThreadPoolExecutor(getWorkerThreads(),
getWorkerThreads(),
0L,
TimeUnit.MILLISECONDS,
queue,
new ThreadFactoryBuilder().setNameFormat("thrift-worker-%s").build(),
new ThreadPoolExecutor.AbortPolicy());
Copy the code
GetWorkerThreads and Queue can be set by themselves. The default is 200 and LinkedBlockingQueue is used.
The thread pool used by the Nifty client, defined in AbstractClientChannel, will be used to perform the callback upon receiving the response from the server and return the result. The thread pool is closed when the application is closed.
private static final String SWIFT_CLIENT_POOL_SIZE = "swift.client.pool.size";
private static int poolSize = StringUtils.isEmpty(System.getProperty(SWIFT_CLIENT_POOL_SIZE)) ?
100 : Integer.parseInt(System.getProperty(SWIFT_CLIENT_POOL_SIZE).trim());
private static ExecutorService executorService = Executors.newFixedThreadPool(poolSize);
static {
// Close the thread pool
Runtime.getRuntime().addShutdownHook(
new Thread(new Runnable() {
@Override
public void run(a) { executorService.shutdown(); }})); }@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
{
ChannelBuffer response = extractResponse(e.getMessage());
int sequenceId = extractSequenceId(response);
onResponseReceived(sequenceId, response);
}
private void onResponseReceived(int sequenceId, final ChannelBuffer response) {
final Request request = requestMap.remove(sequenceId);
executorService.execute(new Runnable() {
@Override
public void run(a) { fireResponseReceivedCallback(request.getListener(), response); }}); }Copy the code
Use Executors to create a thread pool with a fixed number of threads.
Netty
Netty implements its Own Executor interface. Execute logic is written entirely by Netty.
When creating a NioEventLoopGroup, you can use a constructor with an Executor method. If we use the no-argument constructor, the framework automatically creates the thread pool executor. This thread pool is then used to create the NioEventLoop, which is eventually set to the property Executor. Executor. execute(runnable) is called when the doStartThread method is executed at service startup. Polling is enabled in runnable.
The thread pool here is custom
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) { threadFactory.newThread(command).start(); }}Copy the code
Thread factories are also custom, mainly by setting name prefixes. The custom thread pool factory is used to create a thread when the execute method is executed and then the thread is started.
Some shallow experience to share
In fact, I also write business every day to directly use the thread pool is not many opportunities, although I have just seen twice, but there is no profound practice.
Here’s my shallow opinion:
-
First, if it’s an RPC service, like our thrift service, is the thread pool set to too low a number of threads? How to judge? I remember that there were many timeout problems in our service during the morning and evening peak hours, and then we caught packets and found that there was a big difference between the received request and the actual processing time, so we guessed that there might be request stacking (actually there was stacking), and we increased the number of threads by 50% to solve this problem.
-
Another time, the Redis service was suspended, resulting in a large amount of timeout of our service. This is because the synchronization operation occupies a large number of request threads, and continuous accumulation will occupy a large number of system resources. Therefore, the interface should not be asynchronously modified while the request core thread pool synchronously processes redis requests, and a separate thread pool should be allocated at the same time. Currently, our services are basically asynchronous, and synchronous IO is not allowed in the main process.
-
Be careful with unbounded blocking queues, and don’t basically create a thread pool with no upper limit of threads, as we did with the custom thread pool below.
newThreadPoolExecutor (nuclear number *2.100.0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable](1000))
Copy the code
Of course, WHAT I said is for the general situation, the specific approach must depend on the business, for example, your demand processing time is short, the task volume is large, you can set the number of threads.