V2.3.0 Release source code
1. Scheduling center xxl-job-admin
1 Scheduler instantiates
XxlJobScheduler is instantiated at startup with XxlJobAdminConfig
@Override
public void afterPropertiesSet() throws Exception {
adminConfig = this;
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}
Copy the code
XxlJobScheduler contained in the
-
JobTriggerPoolHelper: Timer thread pool, base thread pool
-
JobRegistryHelper: Registers the thread pool and initializes the task executor registry by pulling the xxL_job_group configuration
-
JobFailMonitorHelper: log thread pool
-
JobCompleteHelper: The task result processes the thread pool depend on JobTriggerPoolHelper
-
JobLogReportHelper: log export thread pool
-
JobScheduleHelper: The task execution thread pool depends on JobTriggerPoolHelper
public void init() throws Exception { // init i18n initI18n();
// admin trigger pool start JobTriggerPoolHelper.toStart(); // admin registry monitor run JobRegistryHelper.getInstance().start(); // admin fail-monitor run JobFailMonitorHelper.getInstance().start(); // admin lose-monitor run ( depend on JobTriggerPoolHelper ) JobCompleteHelper.getInstance().start(); // admin log report start JobLogReportHelper.getInstance().start(); // start-schedule ( depend on JobTriggerPoolHelper ) JobScheduleHelper.getInstance().start(); logger.info(">>>>>>>>> init xxl-job admin success."); } Copy the code
2 Task Starting
After the front end creates a task, click start call.
@Override public ReturnT<String> start(int id) { XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id); // valid ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(xxlJobInfo.getScheduleType(), ScheduleTypeEnum.NONE); if (ScheduleTypeEnum.NONE == scheduleTypeEnum) { return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type_none_limit_start")) ); NextTriggerTime = 0; nextTriggerTime = 0; nextTriggerTime = 0; try { Date nextValidTime = JobScheduleHelper.generateNextValidTime(xxlJobInfo, new Date(System.currentTimeMillis() + JobScheduleHelper.PRE_READ_MS)); if (nextValidTime == null) { return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type")+I18nUtil.getString("system_unvalid")) ); } nextTriggerTime = nextValidTime.getTime(); } catch (Exception e) { logger.error(e.getMessage(), e); return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type")+I18nUtil.getString("system_unvalid")) ); } xxlJobInfo.setTriggerStatus(1); xxlJobInfo.setTriggerLastTime(0); xxlJobInfo.setTriggerNextTime(nextTriggerTime); xxlJobInfo.setUpdateTime(new Date()); xxlJobInfoDao.update(xxlJobInfo); return ReturnT.SUCCESS; }Copy the code
Then JobScheduleHelper schedules tasks
/ / start manual comomit affairs conn = XxlJobAdminConfig getAdminConfig () getDataSource (). The getConnection (); connAutoCommit = conn.getAutoCommit(); conn.setAutoCommit(false); PreparedStatement ("select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); preparedStatement.execute(); // tx start // 1, pre read long nowTime = system.currentTimemillis (); List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); . for (XxlJobInfo jobInfo: ScheduleList) {/ / time - ring jump/whether/have passed the next time the if (nowTime > jobInfo. GetTriggerNextTime () + PRE_READ_MS) {/ / / / overdue 2.1, trigger-expire > 5s: pass && make next-trigger-time logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()); Misfire Match MisfireStrategyEnum MisfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING); If (MisfireStrategyEnum.FIRE_ONCE_NOW == MisfireStrategyEnum) {// FIRE_ONCE_NOW "trigger // compensation to execute the expiration policy JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); } // Fresh Next refreshNextValidTime(jobInfo, new Date()); } else if (nowTime > jobInfo. GetTriggerNextTime ()) {/ / has not expired, the normal execution / / 2.2, the trigger - expire < 5 s: Direct - trigger && make next - the trigger - time / / 1, the trigger JobTriggerPoolHelper. The trigger (jobInfo. GetId (), TriggerTypeEnum.CRON, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); // 2, Fresh Next refreshNextValidTime(jobInfo, new Date()); // Scheduling status: 0- Stop, 1- run // In execution, // next-trigger-time in 5s, pre-read again if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) { //// If the next trigger time of the task is within 5 seconds, Then put it into the time wheel (Map<Integer, A List < Integer > > number of seconds (1-60) = > task id List) / / 1, the make ring second int ringSecond = (int) ((jobInfo. GetTriggerNextTime () / 1000) % 60); // 2, Push time ring pushTimeRing(ringSecond, jobinfo.getid ()); / / 3, fresh next refreshNextValidTime (jobInfo, new Date (jobInfo. GetTriggerNextTime ())); }} else {// 2.3, trigger-pre-read: Time-ring trigger && make next-trigger-time // 1, make ring second int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 2, Push time ring pushTimeRing(ringSecond, jobinfo.getid ()); / / 3, fresh next refreshNextValidTime (jobInfo, new Date (jobInfo. GetTriggerNextTime ())); }}Copy the code
Time wheel: Ring thread processing logic
// Remove the task list ID from the time wheel 2 seconds before the current number of seconds (avoid too long processing time, cross the scale, check a scale forward), trigger the task one by one; // second data List<Integer> ringItemData = new ArrayList<>(); int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // To avoid too long processing time, step over the scale, check a scale forward; for (int i = 0; i < 2; i++) { List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 ); if (tmpData ! = null) { ringItemData.addAll(tmpData); } } // ring trigger logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) ); if (ringItemData.size() > 0) { // do trigger for (int jobId: ringItemData) { // do trigger JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null); } // clear ringItemData.clear(); }Copy the code
3 Task execution – Fragment execution & Route execution policy
Sharding execution:
-
Pull up the list of machines executing the task, set index/total one by one, and distribute index/total to the task executor
-
Task executor can develop sharding tasks according to index/total parameters
// param ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)? String.valueOf(index).concat(“/”).concat(String.valueOf(total)):null;
// 1, save log-id XxlJobLog jobLog = new XxlJobLog(); jobLog.setJobGroup(jobInfo.getJobGroup()); jobLog.setJobId(jobInfo.getId()); jobLog.setTriggerTime(new Date()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog); logger.debug(“>>>>>>>>>>> xxl-job trigger start, jobId:{}”, jobLog.getId());
// 2, init TriggerParam TriggerParam = new TriggerParam(); triggerParam.setJobId(jobInfo.getId()); triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); triggerParam.setExecutorParams(jobInfo.getExecutorParam()); triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout()); triggerParam.setLogId(jobLog.getId()); triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime()); triggerParam.setGlueType(jobInfo.getGlueType()); triggerParam.setGlueSource(jobInfo.getGlueSource()); triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); triggerParam.setBroadcastIndex(index); triggerParam.setBroadcastTotal(total);
3, init address String address = null; ReturnT routeAddressResult = null; if (group.getRegistryList()! =null && ! group.getRegistryList().isEmpty()) { if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) { if (index < group.getRegistryList().size()) { address = group.getRegistryList().get(index); } else { address = group.getRegistryList().get(0); }} else {/ / actuator routing strategy routeAddressResult = executorRouteStrategyEnum getRouter (). The route (triggerParam, group.getRegistryList()); if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { address = routeAddressResult.getContent(); } } } else { routeAddressResult = new ReturnT(ReturnT.FAIL_CODE, I18nUtil.getString(“jobconf_trigger_address_empty”)); }
Trigger remote executor ReturnT triggerResult = null; if (address ! = null) { triggerResult = runExecutor(triggerParam, address); } else { triggerResult = new ReturnT(ReturnT.FAIL_CODE, null); }
// 5, Collection trigger info StringBuffer triggerMsgSb = new StringBuffer(); Jobconf_trigger_type triggerMsgSb. Append (I18nUtil. Get string (” “)), append (” : “), append (triggerType, getTitle ()); TriggerMsgSb. Append (” “). Append (I18nUtil. Get string (” jobconf_trigger_admin_adress “)), append (” : “), append (IpUtil. GetIp ()); TriggerMsgSb. Append (” “). Append (I18nUtil. Get string (” jobconf_trigger_exe_regtype “)), append (” : “) .append( (group.getAddressType() == 0)? I18nUtil.getString(“jobgroup_field_addressType_0”):I18nUtil.getString(“jobgroup_field_addressType_1″) ); TriggerMsgSb. Append (” “). Append (I18nUtil. Get string (” jobconf_trigger_exe_regaddress “)), append (” : “).append(group.getRegistryList()); TriggerMsgSb. Append (” “). Append (I18nUtil. Get string (” jobinfo_field_executorRouteStrategy “)), append (” : “).append(executorRouteStrategyEnum.getTitle()); if (shardingParam ! = null) { triggerMsgSb.append(“(“+shardingParam+”)”); } triggerMsgSb. Append (” “), append (I18nUtil. Get string (” jobinfo_field_executorBlockStrategy “)), append (” : “).append(blockStrategy.getTitle()); TriggerMsgSb. Append (” “). Append (I18nUtil. Get string (” jobinfo_field_timeout “)), append (” : “).append(jobInfo.getExecutorTimeout()); TriggerMsgSb. Append (” “). Append (I18nUtil. Get string (” jobinfo_field_executorFailRetryCount “)), append (” : “).append(finalFailRetryCount);
triggerMsgSb.append(” >>>>>>>>>>>”+ I18nUtil.getString(“jobconf_trigger_run”) +”<<<<<<<<<<< “) .append((routeAddressResult! =null&&routeAddressResult.getMsg()! =null)? routeAddressResult.getMsg()+” “:””).append(triggerResult.getMsg()! =null? triggerResult.getMsg():””);
/ / 6, save the log trigger – info jobLog. SetExecutorAddress (address); jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); jobLog.setExecutorParam(jobInfo.getExecutorParam()); jobLog.setExecutorShardingParam(shardingParam); jobLog.setExecutorFailRetryCount(finalFailRetryCount); //jobLog.setTriggerTime(); jobLog.setTriggerCode(triggerResult.getCode()); jobLog.setTriggerMsg(triggerMsgSb.toString()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
logger.debug(“>>>>>>>>>>> xxl-job trigger end, jobId:{}”, jobLog.getId());
Route Execution Policy
- In cluster deployment, multiple routing policies are provided, including first, last, polling, random, consistent HASH, least Frequently used, most recently unused, failover, and busy failover.
- First, last, polling, random: simply read address_list
- Consistent HASH: TreeSet implements a consistent HASH algorithm
- Least frequently used, most recently not used: HashMap, LinkedHashMap
- Failover: When traversing the address_list for addresses, check the heartbeat (request return status) of each address. Address is returned only if the heartbeat is normal
- Busy transfer: When traversing address_list to obtain addresses, check whether the addresses are busy one by one (request return status). Only the address in idle state is returned
4 Task Completion
Through JobApiController expose apis used in the XXL – job – the core TriggerCallbackThread. DoCallback call to complete the task
In the JobCompleteHelper
Normal completion:
private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) { // valid log item XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId()); if (log == null) { return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found."); } if (log.getHandleCode() > 0) { return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc } // handle msg StringBuffer handleMsg = new StringBuffer(); if (log.getHandleMsg()! =null) { handleMsg.append(log.getHandleMsg()).append("<br>"); } if (handleCallbackParam.getHandleMsg() ! = null) { handleMsg.append(handleCallbackParam.getHandleMsg()); } // success, save log log.setHandleTime(new Date()); log.setHandleCode(handleCallbackParam.getHandleCode()); log.setHandleMsg(handleMsg.toString()); XxlJobCompleter.updateHandleInfoAndFinish(log); return ReturnT.SUCCESS; }Copy the code
5 Handle task result loss
The monitorThread processes the loss of task results as follows: If the scheduling record stays in the Running state for more than 10 minutes and the heartbeat of the actuator fails to register and is not online, the local scheduling fails
Handling task result loss: If the scheduling record stays in the Running state for more than 10 minutes and the heartbeat registration of the corresponding actuator fails and is not online, the local scheduling failure is actively marked. Date losedTime = DateUtil.addMinutes(new Date(), -10); List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime); if (losedJobIds! =null && losedJobIds.size()>0) { for (Long logId: losedJobIds) { XxlJobLog jobLog = new XxlJobLog(); jobLog.setId(logId); jobLog.setHandleTime(new Date()); jobLog.setHandleCode(ReturnT.FAIL_CODE); jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") ); XxlJobCompleter.updateHandleInfoAndFinish(jobLog); }}Copy the code
2. Actuator XXL-JOB-core
1 Instantiate XxlJob
Instantiate the task (agent) annotating @xxlJob with XxlJobSpringExecutor and initialize XxlJobExecutor
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) { Method executeMethod = methodXxlJobEntry.getKey(); XxlJob xxlJob = methodXxlJobEntry.getValue(); if (xxlJob == null) { continue; } String name = xxlJob.value(); . if (xxlJob.init().trim().length() > 0) { try { initMethod = bean.getClass().getDeclaredMethod(xxlJob.init()); initMethod.setAccessible(true); } catch (NoSuchMethodException e) { throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] ."); } } if (xxlJob.destroy().trim().length() > 0) { try { destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy()); destroyMethod.setAccessible(true); } catch (NoSuchMethodException e) { throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] ."); } } // registry jobhandler registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod)); }Copy the code
2 Initialize XxlJobExecutor
public void start() throws Exception { // init logpath XxlJobFileAppender.initLogPath(logPath); // init invoker, admin-client initAdminBizList(adminAddresses, accessToken); / / init JobLogFileCleanThread log clear thread pool JobLogFileCleanThread. GetInstance (). The start (logRetentionDays); / / init TriggerCallbackThread log finished results callback thread pool TriggerCallbackThread. GetInstance (). The start (); // Init Executor-server RPC gateway initEmbedServer(Address, IP, port, AppName, accessToken); }Copy the code
JobLogFileCleanThread: Log clearing thread pool
3 Callback the log result
TriggerCallbackThread: Callback thread pool for log completion results
// init logpath XxlJobFileAppender.initLogPath(logPath); // init invoker, admin-client initAdminBizList(adminAddresses, accessToken); // init JobLogFileCleanThread JobLogFileCleanThread.getInstance().start(logRetentionDays); // init TriggerCallbackThread TriggerCallbackThread.getInstance().start(); // Init executor-server initialize xxl-RPC initEmbedServer(address, IP, port, AppName, accessToken);Copy the code
4 XXL- Instantiate RPC and ExecutorRegistryThread
try { // start server ServerBootstrap bootstrap = new ServerBootstrap(); //netty nio uses bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)); } }) .childOption(ChannelOption.SO_KEEPALIVE, true); // bind ChannelFuture future = bootstrap.bind(port).sync(); logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port); // start registry startRegistry(appname, address); // wait util stop future.channel().closeFuture().sync(); }... ExecutorRegistryThread.getInstance().start(appname, address);Copy the code