Job type
Xxl-job supports seven job types: Bean, GLUE(Java), GLUE(Shell), GLUE(Python), GLUE(PHP), GLUE(Nodejs), and GLUE(PowerShell). Where, GLUE jobs edit business codes on the admin management side, while Bean jobs integrate user business code logic into XXL-job for scheduling, and the source code is located in the user project instead of the admin module of XXL-job.
Xxl-job abstracts the IJobHandler component used to execute jobs. There are three types of xxl-job:
MethodJobHandler: A bean-type job handler. The bean-type job logic is actually encapsulated in a Method annotated with @xxlJob;
ScriptJobHandler: script type job processor, such as Shell, Python, PHP, Nodejs, PowerShell, etc., can recognize script type jobs and use this processor.
GlueJobHandler: This type of job processor is specifically used to process Glue(Java) jobs. As discussed in the previous section, Java jobs are compiled by GlueFactory, initialized into instances, and then encapsulated into GlueJobHandler for execution.
Execute the process
Server-side process
Server job execution trigger entry see JobTriggerPoolHelper#addTrigger:
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
// There is a rule that triggers the task to be posted from one of the two thread pools
// fastTriggerPool: default post thread pool
// slowTriggerPool: Slow jobs are posted to this thread pool
// Slow job definition: a job is slow if the post exceeds 500ms and totals more than 10 times per minute (reset cache recalculation per minute), and slowTriggerPool is used for subsequent execution
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if(jobTimeoutCount! =null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
// trigger
triggerPool_.execute(new Runnable() {
@Override
public void run(a) {
long start = System.currentTimeMillis();
try {
// Trigger the job
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// Clear the accumulated cache of slow jobs every minute
long minTim_now = System.currentTimeMillis()/60000;
if(minTim ! = minTim_now) { minTim = minTim_now; jobTimeoutCountMap.clear(); }// Over 500ms, the cumulative number of slow jobs +1,
// The execution side adopts asynchronous mode: the job is sent to the execution side and returned after it is put into the queue. Therefore, this time does not include the execution time of the job itself
long cost = System.currentTimeMillis()-start;
if (cost > 500) { // ob-timeout threshold 500ms
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if(timeoutCount ! =null) { timeoutCount.incrementAndGet(); }}}}}); }Copy the code
Continue to follow XxlJobTrigger#trigger:
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
// Block processing policy
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);
// Routing policy
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
// Fragment parametersString shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)? String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
// 1, save log-id
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
jobLog.setTriggerTime(new Date());
// xxl_job_log Inserts run logs
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// init trigger-param
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(index);
triggerParam.setBroadcastTotal(total);
// Initializes the address of the actuator
String address = null;
ReturnT<String> routeAddressResult = null;
if(group.getRegistryList()! =null && !group.getRegistryList().isEmpty()) {
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
// Fragmented broadcast mode
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0); }}else {
// The routing policy selects the address of the actuator
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if(routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { address = routeAddressResult.getContent(); }}}else {
routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
}
// trigger remote executor
ReturnT<String> triggerResult = null;
if(address ! =null) {
// Job execution
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
}
// Collect execution information
StringBuffer triggerMsgSb = new StringBuffer();
triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
.append( (group.getAddressType() == 0)? I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1")); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
if(shardingParam ! =null) {
triggerMsgSb.append("("+shardingParam+")");
}
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
triggerMsgSb.append("
> > > > > > > > > > >"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>") .append((routeAddressResult! =null&&routeAddressResult.getMsg()! =null)? routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()! =null? triggerResult.getMsg():"");
// 6. Save log trigger-info
jobLog.setExecutorAddress(address);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setExecutorShardingParam(shardingParam);
jobLog.setExecutorFailRetryCount(finalFailRetryCount);
//jobLog.setTriggerTime();
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
// Update the execution information to xxl_job_log
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
Copy the code
This method is more code, but the logic is relatively simple, the core logic: broadcast or routing policy to select the executor address -> job execution -> collect execution information update to xxL_job_log log table.
XxlJobTrigger#runExecutor:
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
// Get ExecutorBiz according to address
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
// Result parsing
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("< br > address:").append(address);
runResultSB.append("< br > code.").append(runResult.getCode());
runResultSB.append("< br > MSG:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}
Copy the code
Obtain the corresponding executor agent ExecutorBiz based on address, and then call its run method to send the job to the executor. In the previous section, we analyzed the implementation of netty to initialize an HTTP server web container. Therefore, the delivery logic here is relatively simple. Is called HTTP interface XxlJobRemotingUtil. PostBody (addressUrl + “run”, accessToken, timeout, triggerParam, String, class); .
Executive-side flow
As analyzed in the previous section of the executor startup process, it initializes an HTTP server web container using netty to receive instructions from admin, and then transfers the received instructions to the EmbedHttpServerHandler#process:
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
// valid
if(HttpMethod.POST ! = httpMethod) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri==null || uri.trim().length()==0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
if(accessToken! =null
&& accessToken.trim().length()>0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
// services mapping
try {
if ("/beat".equals(uri)) { // If the actuator is running properly (online), the routing policy is failover
return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {// If the actuator is idle, the routing policy is busy transfer
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) { // kill job command listener
logger.info("receive kill, data:{}", requestData);
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {// View the executor scheduling log listener
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found."); }}catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:"+ ThrowableUtil.toString(e)); }}Copy the code
Continue tracking ExecutorBizlot # Run:
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
// Load old: jobHandler + jobThread
// Load JobThread and IJobHandler from the cache according to jobIdJobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread! =null? jobThread.getHandler():null;
String removeOldReason = null;
// The job type matches and IJobHandler checks
// For example, job IJobHandler sends changes, or the source code of the Glue class job is edited. Then the cached JobThread cannot be used any more, and the latest IJobHandler is used to create JobThread
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) {// Bean-type jobs. }else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {//Java type job. }else if(glueTypeEnum! =null && glueTypeEnum.isScript()) {// Script class jobs. }else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
}
if(jobThread ! =null) {
// If JobThread! If = null, jobs may be running on the JobThread and are processed according to the blocking policy
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// Discard subsequent scheduling: If JobThread is still executing jobs or has queued jobs in its triggerQueue, the current job is discarded
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); }}else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
// Override previous scheduling: If JobThread is still executing jobs or has queued jobs in its triggerQueue, destroy the previous JobThread and recreate the JobThread to run the current job
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null; }}else {
// Send jobs directly to the JobThread triggerQueue}}if (jobThread == null) {
// Create JobThread, place it in cache, destroy if it already exists in the jobId cache
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
logger.debug("jobThread.pushTriggerQueue hash:{}, data:{}", System.identityHashCode(jobThread), GsonTool.toJson(triggerParam));
// Place the delivered job into the JobThread's triggerQueue, and the JobThread will fetch it from the triggerQueue
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
Copy the code
The job is posted to the triggerQueue queue of JobThread#run:
@Override
public void run(a) {
try {
// Call the iJobHandler. init method, such as @xxlJob (init= XXX)
handler.init();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
while(! toStop){// running=false Indicates that no jobs are being processed by the JobThread
// isRunningOrHasQueue() is used to determine whether JobThread is running and triggerQueue
running = false;
// The number of idle times accumulates +1
idleTimes++;
TriggerParam triggerParam = null;
ReturnT<String> executeResult = null;
try {
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if(triggerParam! =null) {
// running=true Indicates that the JobThread is processing jobs
running = true;
// Reset the number of idle statistics
idleTimes = 0;
triggerLogIdSet.remove(triggerParam.getLogId());
// log filename, like "logPath/yyyy-MM-dd/9999.log"
// Initialize the log file
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
XxlJobFileAppender.contextHolder.set(logFileName);
// Inject shard information into the thread context: InheritableThreadLocal
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
// execute
XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());
// executorTimeout: Executes timeout control for jobs
/ / implementation of normal operation is handler. The execute (triggerParam. GetExecutorParams ()),
// Encapsulate the FutureTask and execute it asynchronously in a thread with timeout control
if (triggerParam.getExecutorTimeout() > 0) {
// limit timeout
Thread futureThread = null;
try {
final TriggerParam triggerParamTmp = triggerParam;
FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
@Override
public ReturnT<String> call(a) throws Exception {
returnhandler.execute(triggerParamTmp.getExecutorParams()); }}); futureThread =new Thread(futureTask);
futureThread.start();
executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
} catch (TimeoutException e) {
XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
XxlJobLogger.log(e);
executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
} finally{ futureThread.interrupt(); }}else {
// Call the corresponding IJobHandler to process the job
executeResult = handler.execute(triggerParam.getExecutorParams());
}
if (executeResult == null) {
executeResult = IJobHandler.FAIL;
} else{ executeResult.setMsg( (executeResult! =null&&executeResult.getMsg()! =null&&executeResult.getMsg().length()>50000)
?executeResult.getMsg().substring(0.50000).concat("...")
:executeResult.getMsg());
executeResult.setContent(null); // limit obj size
}
XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);
} else {
// If the JobThread times out for 30 consecutive times (3 seconds each time), that is, the JobThread remains idle within 90 seconds, the JobThread is destroyed
if (idleTimes > 30) {
if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); }}}}catch (Throwable e) {
if (toStop) {
XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
}
StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
String errorMsg = stringWriter.toString();
executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);
// If the job execution is abnormal, the exception information is written to the log
XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
} finally {
if(triggerParam ! =null) {
if(! toStop) {// If JobThread is not stopped, the asynchronous callback mechanism pushes the execution result to admin
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
} else {
// When JobThread stops, the asynchronous callback mechanism pushes the kill exception to admin
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
TriggerCallbackThread.pushCallBack(newHandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); }}}}// If JobThread is killed, check whether there are any jobs waiting to trigger in triggerQueue, and push an exception message to admin if there are
while(triggerQueue ! =null && triggerQueue.size()>0){
TriggerParam triggerParam = triggerQueue.poll();
if(triggerParam! =null) {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
TriggerCallbackThread.pushCallBack(newHandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); }}// destroy
try {
// Destroy IJobHandler by calling the iJobHandler. destroy method, such as @xxlJob (destroy= XXX)
handler.destroy();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}
Copy the code
This is a lot of code, but the logic is not too complicated and is easy to understand from the comments, so let’s look at the last core component of the execution processIJobHandler
, the calling job execution logic is encapsulated in this component,xxl-job
Built-in provides three implementation methods, respectively corresponding to the callBean
,Java
andThe script type
Homework, the implementation of which is not too complicated, will not be further analyzed here.
Core abstract component
ExecutorRouter: A routing component, which selects an executor address.
ExecutorBizClient: The routing component selects the task executor address and packages it as an ExecutorBizClient, which can be regarded as an executor agent on the engine side, shielding low-level details of remote RPC network communication.
EmbedHttpServerHandler: The executor implements the HTTP Server container via Netty, and the EmbedHttpServerHandler extension component is used to handle receiving instructions;
ExecutorBizImpl: ExecutorBizClient acts as an executor proxy on the engine side, mainly forwarding instructions to the executor through RPC, and implementing logical encapsulation on the executor. Both ExecutorBizClient and ExecutorBizImpl implement the same interface ExecutorBiz.
JobThread: Each task executed on the actuator has a JobThread. The tasks are independent of each other. The JobThread controls the concurrent model of tasks on the actuator.
IJobHandler: IJobHandler encapsulates how to call the task logic. Xxl-job has three built-in implementation classes for calling different types of tasks.
conclusion
The core and key codes of XXL-job execution have been analyzed and sorted out as a whole above, which is still relatively simple and may be boring. The general process of job execution is briefly sorted out as follows (see the figure below), so as to have a general understanding of xxL-job scheduling mechanism:
General description:
xxl-job
The overall architecture adopts centralized design, which is divided into scheduling centerAdmin
And the actuator two parts;- Dispatch center
Admin
Module providestrigger
Trigger the interface to schedule jobs, and then allocate jobs to one of the two thread pools for execution according to the job delivery time statistics. - Log the job startup to the
xxl_job_log
Table, then use the routing component to select the actuator address and use the actuator proxyExecutorBiz
Sends execution to the route’s actuator, the actuator agentExecutorBiz
The implementation is simple: just sendhttp
Requests; - When the actuator is started, it uses
netty
Initialize an inlinehttp server
The container, when receiving an instruction from the dispatch center, forwards it toEmbedHttpServerHandler
Processor processing; EmbedHttpServerHandler
The processor processes the job execution instructions according to thejobId
Find the corresponding from the cacheJobThread
And then posts the job execution instruction toJobThread
In the instancetriggerQueue
Queuing in a queue;JobThread
The thread keeps looping fromtriggerQueue
The queue extracts the job information that is waiting to be executed and then submits it toIJobHandler
Actually handle job calls,JobThread
willIJobHandler
The processing results are analyzed and delivered toTriggerCallbackThread
In the threadcallBackQueue
Queuing in a queue;TriggerCallbackThread
The internal thread is also constantly loop fromcallBackQueue
Extract the callback task and forward it todoCallback
Method, this method passes internallyAdmin
The proxy classAdminBizClient
The call result callback is sent to the call center’s callback interface, the completion of the job completion notification.
The above is the overall general process of xxL-job execution. If several core components are connected in series to see its context, the whole logic will be clear. The key point here is the JobThread component. Each job has a JobThread instance in each executor. When the job is delivered to the executor, the corresponding JobThread is found and processed. Jobthreads are designed in lazy loading and caching mode. Only when the job delivery executor fails to find the corresponding JobThread, the JobThread is created and returned. When the same job is executed next time, the JobThread can be directly used.
When the executor cannot find the JobThread:
- The job is delivered to the actuator for the first time;
JobThread
The internal thread loops continuously fromtriggerQueue
Extract jobs for processing, one for each job on the actuatorJobThread
If a job is executed once on the executor and then not executed again, or executed infrequently, it may cause a lot of thread wasteJobThread
There is an idle timeout auto-destruct mechanism in the design. when30 * 3 = 90 seconds
If the job is not executed, it is determinedJobThread
If it is idle and times out, it enters the destruction process and then receives the instruction from the job, it will recreate itJobThread
.
More source code analysis article please pay attention to the public number :Reactor2020