Job type

Xxl-job supports seven job types: Bean, GLUE(Java), GLUE(Shell), GLUE(Python), GLUE(PHP), GLUE(Nodejs), and GLUE(PowerShell). Where, GLUE jobs edit business codes on the admin management side, while Bean jobs integrate user business code logic into XXL-job for scheduling, and the source code is located in the user project instead of the admin module of XXL-job.

Xxl-job abstracts the IJobHandler component used to execute jobs. There are three types of xxl-job:

MethodJobHandler: A bean-type job handler. The bean-type job logic is actually encapsulated in a Method annotated with @xxlJob;

ScriptJobHandler: script type job processor, such as Shell, Python, PHP, Nodejs, PowerShell, etc., can recognize script type jobs and use this processor.

GlueJobHandler: This type of job processor is specifically used to process Glue(Java) jobs. As discussed in the previous section, Java jobs are compiled by GlueFactory, initialized into instances, and then encapsulated into GlueJobHandler for execution.

Execute the process

Server-side process

Server job execution trigger entry see JobTriggerPoolHelper#addTrigger:

public void addTrigger(final int jobId,
                       final TriggerTypeEnum triggerType,
                       final int failRetryCount,
                       final String executorShardingParam,
                       final String executorParam,
                       final String addressList) {

    // There is a rule that triggers the task to be posted from one of the two thread pools
    // fastTriggerPool: default post thread pool
    // slowTriggerPool: Slow jobs are posted to this thread pool
    // Slow job definition: a job is slow if the post exceeds 500ms and totals more than 10 times per minute (reset cache recalculation per minute), and slowTriggerPool is used for subsequent execution
    ThreadPoolExecutor triggerPool_ = fastTriggerPool;
    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
    if(jobTimeoutCount! =null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
        triggerPool_ = slowTriggerPool;
    }

    // trigger
    triggerPool_.execute(new Runnable() {
        @Override
        public void run(a) {

            long start = System.currentTimeMillis();

            try {
                // Trigger the job
                XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {

                // Clear the accumulated cache of slow jobs every minute
                long minTim_now = System.currentTimeMillis()/60000;
                if(minTim ! = minTim_now) { minTim = minTim_now; jobTimeoutCountMap.clear(); }// Over 500ms, the cumulative number of slow jobs +1,
                // The execution side adopts asynchronous mode: the job is sent to the execution side and returned after it is put into the queue. Therefore, this time does not include the execution time of the job itself
                long cost = System.currentTimeMillis()-start;
                if (cost > 500) {       // ob-timeout threshold 500ms
                    AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                    if(timeoutCount ! =null) { timeoutCount.incrementAndGet(); }}}}}); }Copy the code

Continue to follow XxlJobTrigger#trigger:

private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){

    // Block processing policy
    ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);
    // Routing policy
    ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
    // Fragment parametersString shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)? String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

    // 1, save log-id
    XxlJobLog jobLog = new XxlJobLog();
    jobLog.setJobGroup(jobInfo.getJobGroup());
    jobLog.setJobId(jobInfo.getId());
    jobLog.setTriggerTime(new Date());
    // xxl_job_log Inserts run logs
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
    logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

    // init trigger-param
    TriggerParam triggerParam = new TriggerParam();
    triggerParam.setJobId(jobInfo.getId());
    triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
    triggerParam.setExecutorParams(jobInfo.getExecutorParam());
    triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
    triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
    triggerParam.setLogId(jobLog.getId());
    triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
    triggerParam.setGlueType(jobInfo.getGlueType());
    triggerParam.setGlueSource(jobInfo.getGlueSource());
    triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
    triggerParam.setBroadcastIndex(index);
    triggerParam.setBroadcastTotal(total);

    // Initializes the address of the actuator
    String address = null;
    ReturnT<String> routeAddressResult = null;
    if(group.getRegistryList()! =null && !group.getRegistryList().isEmpty()) {
        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
            // Fragmented broadcast mode
            if (index < group.getRegistryList().size()) {
                address = group.getRegistryList().get(index);
            } else {
                address = group.getRegistryList().get(0); }}else {
            // The routing policy selects the address of the actuator
            routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
            if(routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { address = routeAddressResult.getContent(); }}}else {
        routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
    }

    // trigger remote executor
    ReturnT<String> triggerResult = null;
    if(address ! =null) {
        // Job execution
        triggerResult = runExecutor(triggerParam, address);
    } else {
        triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
    }

    // Collect execution information
    StringBuffer triggerMsgSb = new StringBuffer();
    triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
                .append( (group.getAddressType() == 0)? I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1")); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
    if(shardingParam ! =null) {
        triggerMsgSb.append("("+shardingParam+")");
    }
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

    triggerMsgSb.append("



> > > > > > > > > > >"
+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>") .append((routeAddressResult! =null&&routeAddressResult.getMsg()! =null)? routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()! =null? triggerResult.getMsg():""); // 6. Save log trigger-info jobLog.setExecutorAddress(address); jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); jobLog.setExecutorParam(jobInfo.getExecutorParam()); jobLog.setExecutorShardingParam(shardingParam); jobLog.setExecutorFailRetryCount(finalFailRetryCount); //jobLog.setTriggerTime(); jobLog.setTriggerCode(triggerResult.getCode()); jobLog.setTriggerMsg(triggerMsgSb.toString()); // Update the execution information to xxl_job_log XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); } Copy the code

This method is more code, but the logic is relatively simple, the core logic: broadcast or routing policy to select the executor address -> job execution -> collect execution information update to xxL_job_log log table.

XxlJobTrigger#runExecutor:

public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try {
        // Get ExecutorBiz according to address
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }

    // Result parsing
    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("< br > address:").append(address);
    runResultSB.append("< br > code.").append(runResult.getCode());
    runResultSB.append("< br > MSG:").append(runResult.getMsg());

    runResult.setMsg(runResultSB.toString());
    return runResult;
}
Copy the code

Obtain the corresponding executor agent ExecutorBiz based on address, and then call its run method to send the job to the executor. In the previous section, we analyzed the implementation of netty to initialize an HTTP server web container. Therefore, the delivery logic here is relatively simple. Is called HTTP interface XxlJobRemotingUtil. PostBody (addressUrl + “run”, accessToken, timeout, triggerParam, String, class); .

Executive-side flow

As analyzed in the previous section of the executor startup process, it initializes an HTTP server web container using netty to receive instructions from admin, and then transfers the received instructions to the EmbedHttpServerHandler#process:

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
        
    // valid
    if(HttpMethod.POST ! = httpMethod) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
    }
    if (uri==null || uri.trim().length()==0) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
    }
    if(accessToken! =null
            && accessToken.trim().length()>0
            && !accessToken.equals(accessTokenReq)) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
    }

    // services mapping
    try {
        if ("/beat".equals(uri)) { // If the actuator is running properly (online), the routing policy is failover
            return executorBiz.beat();
        } else if ("/idleBeat".equals(uri)) {// If the actuator is idle, the routing policy is busy transfer
            IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
            return executorBiz.idleBeat(idleBeatParam);
        } else if ("/run".equals(uri)) {
            TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
            return executorBiz.run(triggerParam);
        } else if ("/kill".equals(uri)) { // kill job command listener
            logger.info("receive kill, data:{}", requestData);
            KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
            return executorBiz.kill(killParam);
        } else if ("/log".equals(uri)) {// View the executor scheduling log listener
            LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
            return executorBiz.log(logParam);
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found."); }}catch (Exception e) {
        logger.error(e.getMessage(), e);
        return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:"+ ThrowableUtil.toString(e)); }}Copy the code

Continue tracking ExecutorBizlot # Run:

@Override
public ReturnT<String> run(TriggerParam triggerParam) {
    // Load old: jobHandler + jobThread
    // Load JobThread and IJobHandler from the cache according to jobIdJobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread! =null? jobThread.getHandler():null;
    String removeOldReason = null;

    // The job type matches and IJobHandler checks
    // For example, job IJobHandler sends changes, or the source code of the Glue class job is edited. Then the cached JobThread cannot be used any more, and the latest IJobHandler is used to create JobThread
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) {// Bean-type jobs. }else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {//Java type job. }else if(glueTypeEnum! =null && glueTypeEnum.isScript()) {// Script class jobs. }else {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
    }

    if(jobThread ! =null) {
        // If JobThread! If = null, jobs may be running on the JobThread and are processed according to the blocking policy
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // Discard subsequent scheduling: If JobThread is still executing jobs or has queued jobs in its triggerQueue, the current job is discarded
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); }}else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            // Override previous scheduling: If JobThread is still executing jobs or has queued jobs in its triggerQueue, destroy the previous JobThread and recreate the JobThread to run the current job
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                jobThread = null; }}else {
            // Send jobs directly to the JobThread triggerQueue}}if (jobThread == null) {
        // Create JobThread, place it in cache, destroy if it already exists in the jobId cache
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }

    logger.debug("jobThread.pushTriggerQueue hash:{}, data:{}", System.identityHashCode(jobThread), GsonTool.toJson(triggerParam));
    // Place the delivered job into the JobThread's triggerQueue, and the JobThread will fetch it from the triggerQueue
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}
Copy the code

The job is posted to the triggerQueue queue of JobThread#run:

@Override
public void run(a) {

    try {
    	// Call the iJobHandler. init method, such as @xxlJob (init= XXX)
		handler.init();
	} catch (Throwable e) {
    	logger.error(e.getMessage(), e);
	}

	while(! toStop){// running=false Indicates that no jobs are being processed by the JobThread
		// isRunningOrHasQueue() is used to determine whether JobThread is running and triggerQueue
		running = false;
		// The number of idle times accumulates +1
		idleTimes++;

        TriggerParam triggerParam = null;
            ReturnT<String> executeResult = null;
            try {
				triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
				if(triggerParam! =null) {
                    // running=true Indicates that the JobThread is processing jobs
					running = true;
					// Reset the number of idle statistics
					idleTimes = 0;
					triggerLogIdSet.remove(triggerParam.getLogId());

					// log filename, like "logPath/yyyy-MM-dd/9999.log"
					// Initialize the log file
					String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
					XxlJobFileAppender.contextHolder.set(logFileName);
					// Inject shard information into the thread context: InheritableThreadLocal
					ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));

					// execute
					XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());

					// executorTimeout: Executes timeout control for jobs
					/ / implementation of normal operation is handler. The execute (triggerParam. GetExecutorParams ()),
					// Encapsulate the FutureTask and execute it asynchronously in a thread with timeout control
					if (triggerParam.getExecutorTimeout() > 0) {
						// limit timeout
						Thread futureThread = null;
						try {
							final TriggerParam triggerParamTmp = triggerParam;
							FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
								@Override
								public ReturnT<String> call(a) throws Exception {
									returnhandler.execute(triggerParamTmp.getExecutorParams()); }}); futureThread =new Thread(futureTask);
							futureThread.start();

							executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
						} catch (TimeoutException e) {

							XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
							XxlJobLogger.log(e);

							executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
						} finally{ futureThread.interrupt(); }}else {
						// Call the corresponding IJobHandler to process the job
						executeResult = handler.execute(triggerParam.getExecutorParams());
					}

					if (executeResult == null) {
						executeResult = IJobHandler.FAIL;
					} else{ executeResult.setMsg( (executeResult! =null&&executeResult.getMsg()! =null&&executeResult.getMsg().length()>50000)
										?executeResult.getMsg().substring(0.50000).concat("...")
										:executeResult.getMsg());
						executeResult.setContent(null);	// limit obj size
					}
					XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);

				} else {
					// If the JobThread times out for 30 consecutive times (3 seconds each time), that is, the JobThread remains idle within 90 seconds, the JobThread is destroyed
					if (idleTimes > 30) {
						if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lost
							XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); }}}}catch (Throwable e) {
				if (toStop) {
					XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
				}

				StringWriter stringWriter = new StringWriter();
				e.printStackTrace(new PrintWriter(stringWriter));
				String errorMsg = stringWriter.toString();
				executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);

				// If the job execution is abnormal, the exception information is written to the log
				XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
			} finally {
                if(triggerParam ! =null) {
                    if(! toStop) {// If JobThread is not stopped, the asynchronous callback mechanism pushes the execution result to admin
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
                    } else {
						// When JobThread stops, the asynchronous callback mechanism pushes the kill exception to admin
                        ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
                        TriggerCallbackThread.pushCallBack(newHandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); }}}}// If JobThread is killed, check whether there are any jobs waiting to trigger in triggerQueue, and push an exception message to admin if there are
		while(triggerQueue ! =null && triggerQueue.size()>0){
			TriggerParam triggerParam = triggerQueue.poll();
			if(triggerParam! =null) {
				// is killed
				ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
				TriggerCallbackThread.pushCallBack(newHandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); }}// destroy
		try {
			// Destroy IJobHandler by calling the iJobHandler. destroy method, such as @xxlJob (destroy= XXX)
			handler.destroy();
		} catch (Throwable e) {
			logger.error(e.getMessage(), e);
		}

		logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}
Copy the code

This is a lot of code, but the logic is not too complicated and is easy to understand from the comments, so let’s look at the last core component of the execution processIJobHandler, the calling job execution logic is encapsulated in this component,xxl-jobBuilt-in provides three implementation methods, respectively corresponding to the callBean,JavaandThe script typeHomework, the implementation of which is not too complicated, will not be further analyzed here.

Core abstract component

ExecutorRouter: A routing component, which selects an executor address.

ExecutorBizClient: The routing component selects the task executor address and packages it as an ExecutorBizClient, which can be regarded as an executor agent on the engine side, shielding low-level details of remote RPC network communication.

EmbedHttpServerHandler: The executor implements the HTTP Server container via Netty, and the EmbedHttpServerHandler extension component is used to handle receiving instructions;

ExecutorBizImpl: ExecutorBizClient acts as an executor proxy on the engine side, mainly forwarding instructions to the executor through RPC, and implementing logical encapsulation on the executor. Both ExecutorBizClient and ExecutorBizImpl implement the same interface ExecutorBiz.

JobThread: Each task executed on the actuator has a JobThread. The tasks are independent of each other. The JobThread controls the concurrent model of tasks on the actuator.

IJobHandler: IJobHandler encapsulates how to call the task logic. Xxl-job has three built-in implementation classes for calling different types of tasks.

conclusion

The core and key codes of XXL-job execution have been analyzed and sorted out as a whole above, which is still relatively simple and may be boring. The general process of job execution is briefly sorted out as follows (see the figure below), so as to have a general understanding of xxL-job scheduling mechanism:

General description:

  • xxl-jobThe overall architecture adopts centralized design, which is divided into scheduling centerAdminAnd the actuator two parts;
  • Dispatch centerAdminModule providestriggerTrigger the interface to schedule jobs, and then allocate jobs to one of the two thread pools for execution according to the job delivery time statistics.
  • Log the job startup to thexxl_job_logTable, then use the routing component to select the actuator address and use the actuator proxyExecutorBizSends execution to the route’s actuator, the actuator agentExecutorBizThe implementation is simple: just sendhttpRequests;
  • When the actuator is started, it usesnettyInitialize an inlinehttp serverThe container, when receiving an instruction from the dispatch center, forwards it toEmbedHttpServerHandlerProcessor processing;
  • EmbedHttpServerHandlerThe processor processes the job execution instructions according to thejobIdFind the corresponding from the cacheJobThreadAnd then posts the job execution instruction toJobThreadIn the instancetriggerQueueQueuing in a queue;
  • JobThreadThe thread keeps looping fromtriggerQueueThe queue extracts the job information that is waiting to be executed and then submits it toIJobHandlerActually handle job calls,JobThreadwillIJobHandlerThe processing results are analyzed and delivered toTriggerCallbackThreadIn the threadcallBackQueueQueuing in a queue;
  • TriggerCallbackThreadThe internal thread is also constantly loop fromcallBackQueueExtract the callback task and forward it todoCallbackMethod, this method passes internallyAdminThe proxy classAdminBizClientThe call result callback is sent to the call center’s callback interface, the completion of the job completion notification.

The above is the overall general process of xxL-job execution. If several core components are connected in series to see its context, the whole logic will be clear. The key point here is the JobThread component. Each job has a JobThread instance in each executor. When the job is delivered to the executor, the corresponding JobThread is found and processed. Jobthreads are designed in lazy loading and caching mode. Only when the job delivery executor fails to find the corresponding JobThread, the JobThread is created and returned. When the same job is executed next time, the JobThread can be directly used.

When the executor cannot find the JobThread:

  • The job is delivered to the actuator for the first time;
  • JobThreadThe internal thread loops continuously fromtriggerQueueExtract jobs for processing, one for each job on the actuatorJobThreadIf a job is executed once on the executor and then not executed again, or executed infrequently, it may cause a lot of thread wasteJobThreadThere is an idle timeout auto-destruct mechanism in the design. when30 * 3 = 90 secondsIf the job is not executed, it is determinedJobThreadIf it is idle and times out, it enters the destruction process and then receives the instruction from the job, it will recreate itJobThread.

More source code analysis article please pay attention to the public number :Reactor2020