The introduction
When we think of timed tasks, we think of the JDK’s [Timer][] and the timed scheduling framework [Quartz][], which is used in conjunction with the Spring family of frameworks. Quartz’s task storage comes in two forms, one is memory storage, but when the application is restarted, scheduled tasks will be lost, and the other is database storage, database storage tasks, which solves the problem of task loss caused by application restart. Whether Quartz is used alone or integrated with Spring, there is no way to modify scheduled tasks once the application is launched, and Quartz has limitations for distributed scheduling of scheduled tasks. The emergence of Taobao distributed scheduling framework [TBschedule][] solves the problem of distributed scheduling. However, TBschedule is infected with the common problem of Ali open source. After open source, there is no maintenance, no update and rough documents. Today we will look at a lightweight distributed task scheduling framework XXL_JOB
directory
- usage
- Xxl-job architecture design
- Task Scheduling (Management Console)
- actuator
- conclusion
usage
-
If a domain name is configured, the system automatically registers the host list. Otherwise, the system directly registers the IP address and PORT of the host. The actuators support cluster deployment and have the same name (app_name).
-
Add a task (belonging to an executor).
Xxl-job architecture design
The job management center consists of the following components:
- JobRegistryMonitorHelper: Registers the monitoring service and writes the live servers of all applications to the responding task group
- JobFailMonitorHelper: Fails to retry and generates an alarm if required
- Missing Task Monitor (JobLosedMonitorHelper)
The executive client consists of the following components:
- Log cleaning
- Task callback notification
- Embedded HTTPserver: provides heartbeat, idle heartbeat, job, killJob, and other REST operations.
All communication is HTTP.
Take a look at the console model
Console model
#
# XXL-JOB v22.. 0
# Copyright (c) 2015-present, xuxueli.
CREATE database if NOT EXISTS `xxl_job` default character set utf8mb4 collate utf8mb4_unicode_ci;
use `xxl_job`;
SET NAMES utf8mb4;
-- job
CREATE TABLE `xxl_job_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_group` int(11) NOT NULL COMMENT 'Primary key ID of actuator',
`job_cron` varchar(128) NOT NULL COMMENT 'Task execution CRON',
`job_desc` varchar(255) NOT NULL,
`add_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`author` varchar(64) DEFAULT NULL COMMENT 'the writer',
`alarm_email` varchar(255) DEFAULT NULL COMMENT 'Alarm email',
`executor_route_strategy` varchar(50) DEFAULT NULL COMMENT 'Executive Routing Policy',
`executor_handler` varchar(255) DEFAULT NULL COMMENT 'Executor task Handler',
`executor_param` varchar(512) DEFAULT NULL COMMENT 'Executor Task Parameters',
`executor_block_strategy` varchar(50) DEFAULT NULL COMMENT 'Blocking Handling Strategy',
`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT 'Task execution timeout in seconds',
`executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT 'Failed retry times',
`glue_type` varchar(50) NOT NULL COMMENT 'GLUE type',
`glue_source` mediumtext COMMENT 'GLUE source code ',
`glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE notes',
`glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE update time ',
`child_jobid` varchar(255) DEFAULT NULL COMMENT 'Subtask ID, separated by multiple commas',
`trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT 'Scheduling status: 0- Stopped, 1- Running',
`trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT 'Last dispatch time',
`trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT 'Next dispatch time'.PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- job Execution logs
CREATE TABLE `xxl_job_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`job_group` int(11) NOT NULL COMMENT 'Primary key ID of actuator',
`job_id` int(11) NOT NULL COMMENT 'Task, primary key ID',
`executor_address` varchar(255) DEFAULT NULL COMMENT 'Executor address, address of this execution',
`executor_handler` varchar(255) DEFAULT NULL COMMENT 'Executor task Handler',
`executor_param` varchar(512) DEFAULT NULL COMMENT 'Executor Task Parameters',
`executor_sharding_param` varchar(20) DEFAULT NULL COMMENT 'Executor task fragment parameter of the form 1/2',
`executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT 'Failed retry times',
`trigger_time` datetime DEFAULT NULL COMMENT 'Schedule - Time',
`trigger_code` int(11) NOT NULL COMMENT 'Schedule - Result',
`trigger_msg` text COMMENT 'Schedule - Log',
`handle_time` datetime DEFAULT NULL COMMENT 'Execution-time',
`handle_code` int(11) NOT NULL COMMENT 'Execution-status',
`handle_msg` text COMMENT 'Execution-Log',
`alarm_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT 'Alarm status: 0- Default, -1= locked, 1- No alarm, 2- Alarm succeeded, 3- Alarm failed'.PRIMARY KEY (`id`),
KEY `I_trigger_time` (`trigger_time`),
KEY `I_handle_code` (`handle_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- Job Execution report
CREATE TABLE `xxl_job_log_report` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`trigger_day` datetime DEFAULT NULL COMMENT 'Schedule - Time',
`running_count` int(11) NOT NULL DEFAULT '0' COMMENT 'Running - Log Quantity',
`suc_count` int(11) NOT NULL DEFAULT '0' COMMENT 'Execution succeeded - Number of logs',
`fail_count` int(11) NOT NULL DEFAULT '0' COMMENT 'Failed execution - Number of logs'.PRIMARY KEY (`id`),
UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_logglue` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_id` int(11) NOT NULL COMMENT 'Task, primary key ID',
`glue_type` varchar(50) DEFAULT NULL COMMENT 'GLUE type',
`glue_source` mediumtext COMMENT 'GLUE source code ',
`glue_remark` varchar(128) NOT NULL COMMENT 'GLUE notes',
`add_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL.PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- Job executor registry, when the client starts, registered to the corresponding executor group, group, application name, execution server list (IP :port, domain name)
CREATE TABLE `xxl_job_registry` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`registry_group` varchar(50) NOT NULL,
`registry_key` varchar(255) NOT NULL,
`registry_value` varchar(255) NOT NULL,
`update_time` datetime DEFAULT NULL.PRIMARY KEY (`id`),
KEY `i_g_k_v` (`registry_group`,`registry_key`,`registry_value`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- Job executor
CREATE TABLE `xxl_job_group` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`app_name` varchar(64) NOT NULL COMMENT 'Executor AppName',
`title` varchar(12) NOT NULL COMMENT 'Executor name',
`address_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT 'Actuator address type: 0= automatic registration, 1= manual entry',
`address_list` varchar(512) DEFAULT NULL COMMENT 'List of actuator addresses separated by commas'.PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
- the user table
CREATE TABLE `xxl_job_user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`username` varchar(50) NOT NULL COMMENT 'account',
`password` varchar(50) NOT NULL COMMENT 'password',
`role` tinyint(4) NOT NULL COMMENT 'Roles: 0- Common user, 1- Administrator',
`permission` varchar(255) DEFAULT NULL COMMENT 'Permissions: list of executor ids, separated by multiple commas'.PRIMARY KEY (`id`),
UNIQUE KEY `i_username` (`username`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
- job lock table
CREATE TABLE `xxl_job_lock` (
`lock_name` varchar(50) NOT NULL COMMENT 'Lock name'.PRIMARY KEY (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`) VALUES (1.'xxl-job-executor-sample'.'Sample executor'.0.NULL);
INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_cron`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1.1.'0, 0, 0 * *? * '.'Test Task 1'.'the 2018-11-03 22:21:31'.'the 2018-11-03 22:21:31'.'XXL'.' '.'FIRST'.'demoJobHandler'.' '.'SERIAL_EXECUTION'.0.0.'BEAN'.' '.'GLUE code initialization '.'the 2018-11-03 22:21:31'.' ');
INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1.'admin'.'e10adc3949ba59abbe56e057f20f883e'.1.NULL);
INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock');
commit;
Copy the code
The main models include job grouping, job registry, job, job log, job execution report, and job scheduling lock models.
Let’s look at the scheduling of specific tasks.
Task Scheduling (Management Console)
Start the task scheduler (JobScheduleHelper) //JobScheduleHelper
/ * * * * /
public void start(a){
// schedule thread
scheduleThread = new Thread(new Runnable() {
...
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
// Ring thread polls the tasks that need to be triggered within 5 seconds
ringThread = new Thread(new Runnable() {
});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
}
Copy the code
The job scheduler has two threads. One is the scheduleThread, which selects the jobs that need to be scheduled in the next five seconds from the database. If the jobs are triggered, the scheduleThread notifts the job executor to execute the tasks. If there are still ringData tasks scheduled within the ringData Map, the ringThread searches the ringData Map every second and notifys the job executor to execute any task.
* Stores tasks that need to be scheduled in the current polling batch. Key is time second and value is list<TaskId> */private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
Copy the code
//JobScheduleHelper
/ * * * * /
public void start(a){
// schedule thread
scheduleThread = new Thread(new Runnable() {
@Override
public void run(a) {
try {
// Schedule the thread to sleep 5s
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
...
}
// Number of prefetch thread pools
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while(! scheduleThreadToStop) {// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
// Get the scheduling lock
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1, pre read
long nowTime = System.currentTimeMillis();
Run the following command to query the tasks scheduled within the specified scheduling time, 5s in advance
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if(scheduleList! =null && scheduleList.size()>0) {
// 2
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1, trigger-expire > 5s: pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// Fresh Next, job missed triggering event, refresh the execution time of job triggering next time
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2, trigger-expire < 5s: direct-trigger && make next-trigger-time
// 1
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1.null.null.null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2. Fresh Next Refreshes the execution time when the job is triggered next time
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again, the next trigger event needs to be triggered within 5s
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1, make ring second, get the number of seconds to wait for the next trigger
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000) %60);
// 2
pushTimeRing(ringSecond, jobInfo.getId());
// 3
refreshNextValidTime(jobInfo, newDate(jobInfo.getTriggerNextTime())); }}else {
// 2.3, trigger-pre-read: time-ring trigger && make next-trigger-time
// make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000) %60);
// 2, push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3
refreshNextValidTime(jobInfo, newDate(jobInfo.getTriggerNextTime())); }}// 3. Update trigger info Updates task trigger information
for(XxlJobInfo jobInfo: scheduleList) { XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); }}else {
preReadSuc = false;
}
// tx stop
} catch (Exception e) {
...
} finally {
// commit
if(conn ! =null) {
try {
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.close();
} catch (SQLException e) {
if(! scheduleThreadToStop) { logger.error(e.getMessage(), e); }}}// close PreparedStatement
if (null! = preparedStatement) {try {
preparedStatement.close();
} catch (SQLException e) {
if(! scheduleThreadToStop) { logger.error(e.getMessage(), e); }}}}long cost = System.currentTimeMillis()-start;
// Wait seconds, align second
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if(! scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop"); }}); scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread"); scheduleThread.start(); .Copy the code
As can be seen from the above, the scheduleThread first obtains the scheduling lock and selects the jobs that need to be scheduled in the next five seconds from the database. If the jobs are triggered, the job executor is notified to execute the task.
Let’s look at the processing strategy for tasks that don’t have trigger time
/** * The next set of tasks to poll *@param ringSecond
* @param jobId
*/
private void pushTimeRing(int ringSecond, int jobId){
// push async ring
List<Integer> ringItemData = ringData.get(ringSecond);
if (ringItemData == null) {
ringItemData = new ArrayList<Integer>();
ringData.put(ringSecond, ringItemData);
}
ringItemData.add(jobId);
logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + "=" + Arrays.asList(ringItemData) );
}
Copy the code
RingData in the task set to be scheduled; The ring thread is scheduled.
Let’s look at the ring thread
/ * * * * /
public void start(a){...// Ring thread polls the tasks that need to be triggered within 5 seconds
ringThread = new Thread(new Runnable() {
@Override
public void run(a) {
// align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
while(! ringThreadToStop) {try {
// second data
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // To avoid too long processing time, step over the scale, check a scale forward;
for (int i = 0; i < 2; i++) {
// Retrieve the tasks to be scheduled at the current time
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if(tmpData ! =null) { ringItemData.addAll(tmpData); }}// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + "=" + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
// do trigger
for (int jobId: ringItemData) {
// do trigger the task
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1.null.null.null);
}
// clearringItemData.clear(); }}catch (Exception e) {
if(! ringThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e); }}// next second, align second
try {
// Sleep for 1 second
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if(! ringThreadToStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop"); }}); ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
}
Copy the code
As can be seen from the above, the ring thread checks the corresponding task from the ringData Map every second and notifies the job executor to execute any task.
Let’s look at the key steps, notifying the Job executor to execute the task.
// do trigger the task
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1.null.null.null);
Copy the code
//JobTriggerPoolHelper
/** * Triggers job *@param jobId
* @param triggerType
* @param failRetryCount
* >=0: use this param
* <0: use param from job info config
* @paramExecutorShardingParam Sharding parameter "index/total" *@param executorParam
* null: use job param
* not null: cover job param
*/
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}
/** * add trigger */
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
// choose thread pool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if(jobTimeoutCount! =null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
// trigger
triggerPool_.execute(new Runnable() {
@Override
public void run(a) {
long start = System.currentTimeMillis();
try {
// do trigger Triggers the remote execution server to execute the jobXxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); }}}); }Copy the code
//XxlJobTrigger
/* trigger job * * @param jobId * @param triggerType * @param failRetryCount * >=0: use this param * <0: use param from job info config * @param executorShardingParam * @param executorParam * null: use job param * not null: cover job param * @param addressList * null: use executor addressList * not null: cover */
public static void trigger(int jobId,
TriggerTypeEnum triggerType,
int failRetryCount,
String executorShardingParam,
String executorParam,
String addressList) {
// Load data Loads task data
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
if (jobInfo == null) {
logger.warn("> > > > > > > > > > > > the trigger fail, jobId invalid, jobId = {}", jobId);
return;
}
if(executorParam ! =null) { jobInfo.setExecutorParam(executorParam); }...if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && group.getRegistryList()! =null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
// Fragment broadcast mode, and the fragment parameter is null
for (int i = 0; i < group.getRegistryList().size(); i++) { processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size()); }}else {
if (shardingParam == null) {
shardingParam = new int[] {0.1};
}
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]); }}/** * process the task trigger *@param group job group, registry list may be empty
* @param jobInfo
* @param finalFailRetryCount
* @param triggerType
* @param index sharding index
* @param total sharding index
*/
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){...// 4, trigger remote executor
ReturnT<String> triggerResult = null;
if(address ! =null) {
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null); }... }/**
* run executor
* @param triggerParam
* @param address
* @return* /
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
// Get the client actuator
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
// Execute the actuatorrunResult = executorBiz.run(triggerParam); }}Copy the code
//XxlJobScheduler
/** * gets the actuator at the given address, if it exists, it is loaded directly from the cache, otherwise it is put into the actuator client pool *@param address
* @return
* @throws Exception
*/
public static ExecutorBiz getExecutorBiz(String address) throws Exception {
// valid
if (address==null || address.trim().length()==0) {
return null;
}
// load-cache
address = address.trim();
ExecutorBiz executorBiz = executorBizRepository.get(address);
if(executorBiz ! =null) {
return executorBiz;
}
// set-cache
executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());
executorBizRepository.put(address, executorBiz);
return executorBiz;
}
Copy the code
//ExecutorBizClient
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
Copy the code
As you can see from the above, the task execution is, in fact, selected from the task group according to the routing policy, the execution server, notifying the task server to execute the task. For sharding tasks, all task servers are notified.
Take a look at the task executor
actuator
On the client side, we usually enable the following configuration
@Configuration
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor(a) {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
returnxxlJobSpringExecutor; }... }Copy the code
Look at the initialization of XxlJobSpringExecutor
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware.SmartInitializingSingleton.DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);
/ * * * * /
// start
@Override
public void afterSingletonsInstantiated(a) {
// init JobHandler Repository for older versions
/*initJobHandlerRepository(applicationContext); * /
// init JobHandler Repository (for method)
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory to refresh the GLUE factory type
GlueFactory.refreshInstance(1);
// super start
try {
super.start();
} catch (Exception e) {
throw newRuntimeException(e); }}... }Copy the code
XxlJobSpringExecutor initializes the JobHandler annotation method level job into the executor.
//XxlJobSpringExecutor
/** * Initializes method-level job *@param applicationContext
*/
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
// init job handler from method
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false.true);
for (String beanDefinitionName : beanDefinitionNames) {
Object bean = applicationContext.getBean(beanDefinitionName);
/ / referred to: org. Springframework. Context. Event. EventListenerMethodProcessor. ProcessBean
// Scan the XxlJob annotation method for the bean in the application context
Map<Method, XxlJob> annotatedMethods = null;
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
new MethodIntrospector.MetadataLookup<XxlJob>() {
@Override
public XxlJob inspect(Method method) {
returnAnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class); }}); }catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
}
if (annotatedMethods==null || annotatedMethods.isEmpty()) {
continue;
}
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
Method method = methodXxlJobEntry.getKey();
XxlJob xxlJob = methodXxlJobEntry.getValue();
if (xxlJob == null) {
continue;
}
/ / the name of the job
String name = xxlJob.value();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "].");
}
//check Whether there is a job with the same name
if(loadJobHandler(name) ! =null) {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}
// Execute method, which can have only one parameter and is of the Strng type
if(! (method.getParameterTypes().length ==1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "]." +
"The correct method format like \" public ReturnT<String> execute(String param) \" .");
}
// Check the return value type
if(! method.getReturnType().isAssignableFrom(ReturnT.class)) {throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "]." +
"The correct method format like \" public ReturnT<String> execute(String param) \" .");
}
method.setAccessible(true);
// init and destory, init and destory
Method initMethod = null;
Method destroyMethod = null;
if (xxlJob.init().trim().length() > 0) {
try {
initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "]."); }}if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "]."); }}// registry jobHandler registers the jobhandler
registJobHandler(name, newMethodJobHandler(bean, method, initMethod, destroyMethod)); }}}// ---------------------- job handler repository ----------------------
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
/** * Register job processor *@param name
* @param jobHandler
* @return* /
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return jobHandlerRepository.put(name, jobHandler);
}
Copy the code
ConcurrentMap<String, IJobHandler>;
Take a quick look at method-level tasks
public class MethodJobHandler extends IJobHandler {
/** * job */
private final Object target;
/** * job method */
private final Method method;
/** * Job initialization method */
private Method initMethod;
/** * job destruction method */
privateMethod destroyMethod; . }Copy the code
You can see above that the MethodJobHandler wraps the task object, method-level job, and job initialization and destruction.
Then come the actual startup of the card actuator
//XxlJobExecutor
public class XxlJobExecutor {
/** * Start the job executor *@throws Exception
*/
public void start(a) throws Exception {
// init logpath
XxlJobFileAppender.initLogPath(logPath);
Init invoker, admin-client initializes the job console client
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread Start the log clearing thread
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread starts the TriggerCallbackThread
TriggerCallbackThread.getInstance().start();
// init executor-server Starts the executor serverinitEmbedServer(address, ip, port, appname, accessToken); }... }Copy the code
Start XxlJobExecutor, start the job console client, start the log cleaning thread, start the trigger callback thread (task execution, finish the task, put the callback thread to the callback thread, the callback thread polls the callback queue, And notify the client), and start a Netty embedded HTTP Server to receive scheduling notifications from console scheduling. //XxlJobExecutor
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
// fill ip port
port = port>0? port: NetUtil.findAvailablePort(9999); ip = (ip! =null&&ip.trim().length()>0)? ip: IpUtil.getIp();// generate address
if (address==null || address.trim().length()==0) {
String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address: default use address to registry, otherwise use IP :port if address is null
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
}
// start
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}
Copy the code
//EmbedServer
/ * * * Copy from: https://github.com/xuxueli/xxl-rpc * * netty based embedded client@author xuxueli 2020-04-11 21:25
*/
public class EmbedServer {
private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);
private ExecutorBiz executorBiz;
private Thread thread;
/** * Start the actuator server *@param address
* @param port
* @param appname
* @param accessToken
*/
public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {
@Override
public void run(a) {
// param
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0.200.60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-"+ r.hashCode()); }},new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!"); }});try {
// start server Starts the Netty HTTP server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0.0.30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// start registry, start the task execution registry thread
startRegistry(appname, address);
// wait util stop
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
if (e instanceof InterruptedException) {
logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
} else {
logger.error(">>>>>>>>>>> xxl-job remoting server error.", e); }}finally {
// stop
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch(Exception e) { logger.error(e.getMessage(), e); }}}}); thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}
Copy the code
The key see EmbedHttpServerHandler
//EmbedHttpServerHandler
/**
* netty_http
*
* Copy from : https://github.com/xuxueli/xxl-rpc
*
* @author xuxueli 2015-11-24 22:25:15
*/
public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);
private ExecutorBiz executorBiz;
private String accessToken;
private ThreadPoolExecutor bizThreadPool;
public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {
this.executorBiz = executorBiz;
this.accessToken = accessToken;
this.bizThreadPool = bizThreadPool;
}
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
// request parse
//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
String requestData = msg.content().toString(CharsetUtil.UTF_8);
String uri = msg.uri();
HttpMethod httpMethod = msg.method();
boolean keepAlive = HttpUtil.isKeepAlive(msg);
String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
// invoke
bizThreadPool.execute(new Runnable() {
@Override
public void run(a) {
// Do invoke handles HTTP requests
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
// to json
String responseJson = GsonTool.toJson(responseObj);
// write responsewriteResponse(ctx, keepAlive, responseJson); }}); }/** * Handle admin HTTP requests *@param httpMethod
* @param uri
* @param requestData
* @param accessTokenReq
* @return* /
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
// valid
if(HttpMethod.POST ! = httpMethod) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri==null || uri.trim().length()==0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
if(accessToken! =null
&& accessToken.trim().length()>0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
// services mapping
try {
if ("/beat".equals(uri)) {
/ / the heart
return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
// Idle heartbeat
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {
/ / perform the job
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
//kill job
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found."); }}catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:"+ ThrowableUtil.toString(e)); }}... }Copy the code
As can be seen from the above, embedded HTTPserver mainly provides heartbeat, idle heartbeat, job, killjob, and other REST operations.
ExecutorBizImpl //ExecutorBizImpl
public ReturnT<String> run(TriggerParam triggerParam) {
// Load old: jobHandler + jobThread loads the job thread from the job executorJobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread! =null? jobThread.getHandler():null;
String removeOldReason = null;
// Valid: jobHandler + jobThread Verifies the job thread and job processor
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// new jobHandler creates an executor handler
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// valid old jobThread
if(jobThread! =null&& jobHandler ! = newJobHandler) {// change handler, need kill old thread
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = newJobHandler;
if (jobHandler == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found."); }}}else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
// valid old jobThread
if(jobThread ! =null &&
!(jobThread.getHandler() instanceof GlueJobHandler
&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change handler or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
try {
IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
} catch (Exception e) {
logger.error(e.getMessage(), e);
return newReturnT<String>(ReturnT.FAIL_CODE, e.getMessage()); }}}else if(glueTypeEnum! =null && glueTypeEnum.isScript()) {
// valid old jobThread
if(jobThread ! =null &&
!(jobThread.getHandler() instanceof ScriptJobHandler
&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change script or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = newScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType())); }}else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
}
// executor block strategy
if(jobThread ! =null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); }}else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
// kill running jobThread
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null; }}else {
// just queue trigger}}// replace thread (new or exists invalid)
if (jobThread == null) {
// Register the job to the job executor
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// Push data to queue, push execution parameters to trigger queue
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
Copy the code
You can execute the job from above and actually load the pending job thread, JobThread, from the task set, and push the trigger parameter, the job thread, into the trigger queue.
Now look at the execution parameter TriggerParam
public class TriggerParam implements Serializable{
private static final long serialVersionUID = 42L;
private int jobId;
/** ** execute processor */
private String executorHandler;
/** * Execute the argument */
private String executorParams;
private String executorBlockStrategy;
/** * Executor timeout */
private int executorTimeout;
/** * Job log ID */
private long logId;
private long logDateTime;
private String glueType;
private String glueSource;
private long glueUpdatetime;
/** * Shard task broadcast index */
private int broadcastIndex;
/** * The number of shard tasks broadcast */
private intbroadcastTotal; . }Copy the code
The execution parameters TriggerParam include execution parameters, execution strategy, glue mode, sharding task broadcast index and number of sharding task broadcast.
Let’s go to the Job thread
public class JobThread extends Thread{
private static Logger logger = LoggerFactory.getLogger(JobThread.class);
private int jobId;
/** * job Processor */
private IJobHandler handler;
/** * triggers the task queue */
private LinkedBlockingQueue<TriggerParam> triggerQueue;
private Set<Long> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID
private volatile boolean toStop = false;
private String stopReason;
private boolean running = false; // if running job
private int idleTimes = 0; // idel times
public JobThread(int jobId, IJobHandler handler) {
this.jobId = jobId;
this.handler = handler;
this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();
this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());
/**
* new trigger to queue
*
* @param triggerParam
* @return* /
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
// avoid repeat
if (triggerLogIdSet.contains(triggerParam.getLogId())) {
logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
}
triggerLogIdSet.add(triggerParam.getLogId());
triggerQueue.add(triggerParam);
returnReturnT.SUCCESS; . }Copy the code
Where is the trigger for the thread of execution? Back to XxlJobExecutor
//XxlJobExecutor
Register the job executor and start the job thread@param jobId
* @param handler
* @param removeOldReason
* @return* /public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
JobThread newJobThread = new JobThread(jobId, handler);
newJobThread.start();
logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}".new Object[]{jobId, handler});
JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!
if(oldJobThread ! =null) {
oldJobThread.toStop(removeOldReason);
oldJobThread.interrupt();
}
return newJobThread;
}
Copy the code
Let’s look at starting the Job thread
//JobThread
@Override
public void run(a) {
// init
try {
handler.init();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
// execute
while(! toStop){ running =false;
idleTimes++;
TriggerParam triggerParam = null;
ReturnT<String> executeResult = null;
try {
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
// Pull the trigger parameter from the job thread trigger queue
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if(triggerParam! =null) {
running = true;
idleTimes = 0;
// Remove the corresponding log ID from the log ID set
triggerLogIdSet.remove(triggerParam.getLogId());
// log filename, like "logPath/yyyy-MM-dd/9999.log"
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
XxlJobFileAppender.contextHolder.set(logFileName);
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
// execute
XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());
if (triggerParam.getExecutorTimeout() > 0) {
// limit timeout if the timeout period is not specified
Thread futureThread = null;
try {
final TriggerParam triggerParamTmp = triggerParam;
FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
@Override
public ReturnT<String> call(a) throws Exception {
returnhandler.execute(triggerParamTmp.getExecutorParams()); }}); futureThread =new Thread(futureTask);
futureThread.start();
// Time out to wait for execution results
executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
} catch (TimeoutException e) {
XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
XxlJobLogger.log(e);
executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
} finally{ futureThread.interrupt(); }}else {
// just execute. Otherwise, the corresponding job is executed immediately
executeResult = handler.execute(triggerParam.getExecutorParams());
}
if (executeResult == null) {
executeResult = IJobHandler.FAIL;
} else{ executeResult.setMsg( (executeResult! =null&&executeResult.getMsg()! =null&&executeResult.getMsg().length()>50000)
?executeResult.getMsg().substring(0.50000).concat("...")
:executeResult.getMsg());
executeResult.setContent(null); // limit obj size
}
XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);
} else {
// If the trigger actuator list is empty, the corresponding job is removed from the actuator
if (idleTimes > 30) {
if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); }}}}catch (Throwable e) {
if (toStop) {
XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
}
StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
String errorMsg = stringWriter.toString();
executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);
XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
} finally {
if(triggerParam ! =null) {
// callback handler info
if(! toStop) {// commonm task execution callback notification
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
} else {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
TriggerCallbackThread.pushCallBack(newHandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); }}}}// Callback trigger request in queue. If the executor is closed, the corresponding job is discarded
while(triggerQueue ! =null && triggerQueue.size()>0){
TriggerParam triggerParam = triggerQueue.poll();
if(triggerParam! =null) {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
TriggerCallbackThread.pushCallBack(newHandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); }}// destroy
try {
handler.destroy();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}
Copy the code
As shown in the preceding figure, the process of executing JobThread is to continuously pull trigger parameters from the trigger queue of the job thread. If the trigger parameter times out, the task is executed directly. Otherwise, a FutureTask is created, which times out and waits for the result. At this point, we will control the scheduling task, and the execution of the client executor task analysis, let’s analyze.
conclusion
Task model
The main models include job grouping, job registry, job, job log, job execution report, and job scheduling lock models.
The process of task scheduling
The job scheduler has two threads. One is the scheduleThread, which selects the jobs that need to be scheduled in the next five seconds from the database. If the jobs are triggered, the scheduleThread notifts the job executor to execute the tasks. If there are still ringData tasks scheduled within the ringData Map, the ringThread searches the ringData Map every second and notifys the job executor to execute any task. Task execution, in fact, according to the routing policy, selected from the task group, the execution server, notifying the task server to execute the task. For sharding tasks, all task servers are notified.
XxlJobSpringExecutor initializes the JobHandler annotation method level job into the executor. The rower task set is actually a ConcurrentMap<String, IJobHandler>, MethodJobHandler wrapped task object, method-level job, and job initialization and destruction.
Start XxlJobExecutor, in fact, start the job console client initialization, start the log cleaning thread, start the trigger callback thread (task execution, finish the task, put the callback thread, callback thread polling callback queue, And notify the client), and start a Netty embedded HTTP Server to receive scheduling notifications from console scheduling.
Embedded HTTPserver provides heartbeat, idle heartbeat, job, killjob, and other REST operations.
To execute a job, load the pending job thread (JobThread) from the task set and push the trigger parameter to the trigger queue
Execution parameters TriggerParam, execution parameters, execution policy, glue mode, sharding task broadcast index and number of sharding task broadcast.
If the trigger parameter times out, the task is executed directly. Otherwise, a FutureTask is created and waits for the execution result.
Xxljob disadvantages
Let’s analyze the advantages and disadvantages of XXljob.
advantages
- Based on HTTP protocol, with cross-platform characteristics;
disadvantages
- Task scheduling is unified and controlled; Time difference may exist based on HTTP protocol.
- All task threads are single thread polling scheduling;
- Logs are cut every hour, which makes troubleshooting difficult
Because xxLJob is lightweight, we do not need to request too much, and it can be satisfied for ordinary applications with a small volume.
Is there anything that can be improved? The answer is yes, control the phone that is only responsible for job tasks, performing server statistics, logging, performing data statistics. Client Quartz from the console pull task scheduling, task execution, the execution machine through the list, the screen is the execution model, and the current execution strategy, something eventually choose a, choose one for more machines, we can use the distributed lock to control, lock, according to the fine-grained synchronous mutex, divided into tasks and task scheduling lock (every time scheduling, According to the timestamp generated), obtain the lock, then perform the task.
The attached
Github xxl-job xxl-job github xxl-job Vt distributed task scheduling framework