The introduction

When we think of timed tasks, we think of the JDK’s [Timer][] and the timed scheduling framework [Quartz][], which is used in conjunction with the Spring family of frameworks. Quartz’s task storage comes in two forms, one is memory storage, but when the application is restarted, scheduled tasks will be lost, and the other is database storage, database storage tasks, which solves the problem of task loss caused by application restart. Whether Quartz is used alone or integrated with Spring, there is no way to modify scheduled tasks once the application is launched, and Quartz has limitations for distributed scheduling of scheduled tasks. The emergence of Taobao distributed scheduling framework [TBschedule][] solves the problem of distributed scheduling. However, TBschedule is infected with the common problem of Ali open source. After open source, there is no maintenance, no update and rough documents. Today we will look at a lightweight distributed task scheduling framework XXL_JOB

directory

  • usage
  • Xxl-job architecture design
  • Task Scheduling (Management Console)
  • actuator
  • conclusion

usage

  1. If a domain name is configured, the system automatically registers the host list. Otherwise, the system directly registers the IP address and PORT of the host. The actuators support cluster deployment and have the same name (app_name).

  2. Add a task (belonging to an executor).

Xxl-job architecture design

The job management center consists of the following components:

  • JobRegistryMonitorHelper: Registers the monitoring service and writes the live servers of all applications to the responding task group
  • JobFailMonitorHelper: Fails to retry and generates an alarm if required
  • Missing Task Monitor (JobLosedMonitorHelper)

The executive client consists of the following components:

  • Log cleaning
  • Task callback notification
  • Embedded HTTPserver: provides heartbeat, idle heartbeat, job, killJob, and other REST operations.

All communication is HTTP.

Take a look at the console model

Console model

#
# XXL-JOB v22.. 0
# Copyright (c) 2015-present, xuxueli.

CREATE database if NOT EXISTS `xxl_job` default character set utf8mb4 collate utf8mb4_unicode_ci;
use `xxl_job`;

SET NAMES utf8mb4;
-- job
CREATE TABLE `xxl_job_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `job_group` int(11) NOT NULL COMMENT 'Primary key ID of actuator',
  `job_cron` varchar(128) NOT NULL COMMENT 'Task execution CRON',
  `job_desc` varchar(255) NOT NULL,
  `add_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  `author` varchar(64) DEFAULT NULL COMMENT 'the writer',
  `alarm_email` varchar(255) DEFAULT NULL COMMENT 'Alarm email',
  `executor_route_strategy` varchar(50) DEFAULT NULL COMMENT 'Executive Routing Policy',
  `executor_handler` varchar(255) DEFAULT NULL COMMENT 'Executor task Handler',
  `executor_param` varchar(512) DEFAULT NULL COMMENT 'Executor Task Parameters',
  `executor_block_strategy` varchar(50) DEFAULT NULL COMMENT 'Blocking Handling Strategy',
  `executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT 'Task execution timeout in seconds',
  `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT 'Failed retry times',
  `glue_type` varchar(50) NOT NULL COMMENT 'GLUE type',
  `glue_source` mediumtext COMMENT 'GLUE source code ',
  `glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE notes',
  `glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE update time ',
  `child_jobid` varchar(255) DEFAULT NULL COMMENT 'Subtask ID, separated by multiple commas',
  `trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT 'Scheduling status: 0- Stopped, 1- Running',
  `trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT 'Last dispatch time',
  `trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT 'Next dispatch time'.PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- job Execution logs
CREATE TABLE `xxl_job_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `job_group` int(11) NOT NULL COMMENT 'Primary key ID of actuator',
  `job_id` int(11) NOT NULL COMMENT 'Task, primary key ID',
  `executor_address` varchar(255) DEFAULT NULL COMMENT 'Executor address, address of this execution',
  `executor_handler` varchar(255) DEFAULT NULL COMMENT 'Executor task Handler',
  `executor_param` varchar(512) DEFAULT NULL COMMENT 'Executor Task Parameters',
  `executor_sharding_param` varchar(20) DEFAULT NULL COMMENT 'Executor task fragment parameter of the form 1/2',
  `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT 'Failed retry times',
  `trigger_time` datetime DEFAULT NULL COMMENT 'Schedule - Time',
  `trigger_code` int(11) NOT NULL COMMENT 'Schedule - Result',
  `trigger_msg` text COMMENT 'Schedule - Log',
  `handle_time` datetime DEFAULT NULL COMMENT 'Execution-time',
  `handle_code` int(11) NOT NULL COMMENT 'Execution-status',
  `handle_msg` text COMMENT 'Execution-Log',
  `alarm_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT 'Alarm status: 0- Default, -1= locked, 1- No alarm, 2- Alarm succeeded, 3- Alarm failed'.PRIMARY KEY (`id`),
  KEY `I_trigger_time` (`trigger_time`),
  KEY `I_handle_code` (`handle_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- Job Execution report
CREATE TABLE `xxl_job_log_report` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `trigger_day` datetime DEFAULT NULL COMMENT 'Schedule - Time',
  `running_count` int(11) NOT NULL DEFAULT '0' COMMENT 'Running - Log Quantity',
  `suc_count` int(11) NOT NULL DEFAULT '0' COMMENT 'Execution succeeded - Number of logs',
  `fail_count` int(11) NOT NULL DEFAULT '0' COMMENT 'Failed execution - Number of logs'.PRIMARY KEY (`id`),
  UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_logglue` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `job_id` int(11) NOT NULL COMMENT 'Task, primary key ID',
  `glue_type` varchar(50) DEFAULT NULL COMMENT 'GLUE type',
  `glue_source` mediumtext COMMENT 'GLUE source code ',
  `glue_remark` varchar(128) NOT NULL COMMENT 'GLUE notes',
  `add_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL.PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- Job executor registry, when the client starts, registered to the corresponding executor group, group, application name, execution server list (IP :port, domain name)
CREATE TABLE `xxl_job_registry` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `registry_group` varchar(50) NOT NULL,
  `registry_key` varchar(255) NOT NULL,
  `registry_value` varchar(255) NOT NULL,
  `update_time` datetime DEFAULT NULL.PRIMARY KEY (`id`),
  KEY `i_g_k_v` (`registry_group`,`registry_key`,`registry_value`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- Job executor
CREATE TABLE `xxl_job_group` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `app_name` varchar(64) NOT NULL COMMENT 'Executor AppName',
  `title` varchar(12) NOT NULL COMMENT 'Executor name',
  `address_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT 'Actuator address type: 0= automatic registration, 1= manual entry',
  `address_list` varchar(512) DEFAULT NULL COMMENT 'List of actuator addresses separated by commas'.PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
- the user table
CREATE TABLE `xxl_job_user` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `username` varchar(50) NOT NULL COMMENT 'account',
  `password` varchar(50) NOT NULL COMMENT 'password',
  `role` tinyint(4) NOT NULL COMMENT 'Roles: 0- Common user, 1- Administrator',
  `permission` varchar(255) DEFAULT NULL COMMENT 'Permissions: list of executor ids, separated by multiple commas'.PRIMARY KEY (`id`),
  UNIQUE KEY `i_username` (`username`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
- job lock table
CREATE TABLE `xxl_job_lock` (
  `lock_name` varchar(50) NOT NULL COMMENT 'Lock name'.PRIMARY KEY (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;


INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`) VALUES (1.'xxl-job-executor-sample'.'Sample executor'.0.NULL);
INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_cron`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1.1.'0, 0, 0 * *? * '.'Test Task 1'.'the 2018-11-03 22:21:31'.'the 2018-11-03 22:21:31'.'XXL'.' '.'FIRST'.'demoJobHandler'.' '.'SERIAL_EXECUTION'.0.0.'BEAN'.' '.'GLUE code initialization '.'the 2018-11-03 22:21:31'.' ');
INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1.'admin'.'e10adc3949ba59abbe56e057f20f883e'.1.NULL);
INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock');

commit;


Copy the code

The main models include job grouping, job registry, job, job log, job execution report, and job scheduling lock models.

Let’s look at the scheduling of specific tasks.

Task Scheduling (Management Console)

Start the task scheduler (JobScheduleHelper) //JobScheduleHelper

 / * * * * /
    public void start(a){

        // schedule thread
        scheduleThread = new Thread(new Runnable() {
        ...
        });
        scheduleThread.setDaemon(true);
        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
        scheduleThread.start();

        // Ring thread polls the tasks that need to be triggered within 5 seconds
        ringThread = new Thread(new Runnable() {
           
        });
        ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();
    }
Copy the code

The job scheduler has two threads. One is the scheduleThread, which selects the jobs that need to be scheduled in the next five seconds from the database. If the jobs are triggered, the scheduleThread notifts the job executor to execute the tasks. If there are still ringData tasks scheduled within the ringData Map, the ringThread searches the ringData Map every second and notifys the job executor to execute any task.

* Stores tasks that need to be scheduled in the current polling batch. Key is time second and value is list<TaskId> */private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
Copy the code

//JobScheduleHelper

 / * * * * /
    public void start(a){

        // schedule thread
        scheduleThread = new Thread(new Runnable() {
            @Override
            public void run(a) {

                try {
                    // Schedule the thread to sleep 5s
                    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                } catch (InterruptedException e) {
                    
                    ...
                }
                // Number of prefetch thread pools
                // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
                int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

                while(! scheduleThreadToStop) {// Scan Job
                    long start = System.currentTimeMillis();

                    Connection conn = null;
                    Boolean connAutoCommit = null;
                    PreparedStatement preparedStatement = null;

                    boolean preReadSuc = true;
                    try {

                        conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                        connAutoCommit = conn.getAutoCommit();
                        conn.setAutoCommit(false);
                        // Get the scheduling lock
                        preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                        preparedStatement.execute();

                        // tx start

                        // 1, pre read
                        long nowTime = System.currentTimeMillis();
                        Run the following command to query the tasks scheduled within the specified scheduling time, 5s in advance
                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                        if(scheduleList! =null && scheduleList.size()>0) {
                            // 2
                            for (XxlJobInfo jobInfo: scheduleList) {

                                // time-ring jump
                                if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                    // 2.1, trigger-expire > 5s: pass && make next-trigger-time
                                    logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

                                    // Fresh Next, job missed triggering event, refresh the execution time of job triggering next time
                                    refreshNextValidTime(jobInfo, new Date());

                                } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                    // 2.2, trigger-expire < 5s: direct-trigger && make next-trigger-time

                                    // 1
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1.null.null.null);
                                    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

                                    // 2. Fresh Next Refreshes the execution time when the job is triggered next time
                                    refreshNextValidTime(jobInfo, new Date());

                                    // next-trigger-time in 5s, pre-read again, the next trigger event needs to be triggered within 5s
                                    if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

                                        // 1, make ring second, get the number of seconds to wait for the next trigger
                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000) %60);

                                        // 2
                                        pushTimeRing(ringSecond, jobInfo.getId());

                                        // 3
                                        refreshNextValidTime(jobInfo, newDate(jobInfo.getTriggerNextTime())); }}else {
                                    // 2.3, trigger-pre-read: time-ring trigger && make next-trigger-time

                                    // make ring second
                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000) %60);

                                    // 2, push time ring
                                    pushTimeRing(ringSecond, jobInfo.getId());

                                    // 3
                                    refreshNextValidTime(jobInfo, newDate(jobInfo.getTriggerNextTime())); }}// 3. Update trigger info Updates task trigger information
                            for(XxlJobInfo jobInfo: scheduleList) { XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); }}else {
                            preReadSuc = false;
                        }

                        // tx stop


                    } catch (Exception e) {
                      ...
                    } finally {

                        // commit
                        if(conn ! =null) {
                            try {
                                conn.commit();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.setAutoCommit(connAutoCommit);
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.close();
                            } catch (SQLException e) {
                                if(! scheduleThreadToStop) { logger.error(e.getMessage(), e); }}}// close PreparedStatement
                        if (null! = preparedStatement) {try {
                                preparedStatement.close();
                            } catch (SQLException e) {
                                if(! scheduleThreadToStop) { logger.error(e.getMessage(), e); }}}}long cost = System.currentTimeMillis()-start;


                    // Wait seconds, align second
                    if (cost < 1000) {  // scan-overtime, not wait
                        try {
                            // pre-read period: success > scan each second; fail > skip this period;
                            TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                        } catch (InterruptedException e) {
                            if(! scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop"); }}); scheduleThread.setDaemon(true);
        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread"); scheduleThread.start(); .Copy the code

As can be seen from the above, the scheduleThread first obtains the scheduling lock and selects the jobs that need to be scheduled in the next five seconds from the database. If the jobs are triggered, the job executor is notified to execute the task.

Let’s look at the processing strategy for tasks that don’t have trigger time

  /** * The next set of tasks to poll *@param ringSecond
     * @param jobId
     */
    private void pushTimeRing(int ringSecond, int jobId){
        // push async ring
        List<Integer> ringItemData = ringData.get(ringSecond);
        if (ringItemData == null) {
            ringItemData = new ArrayList<Integer>();
            ringData.put(ringSecond, ringItemData);
        }
        ringItemData.add(jobId);

        logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + "=" + Arrays.asList(ringItemData) );
    }
Copy the code

RingData in the task set to be scheduled; The ring thread is scheduled.

Let’s look at the ring thread

 / * * * * /
    public void start(a){...// Ring thread polls the tasks that need to be triggered within 5 seconds
        ringThread = new Thread(new Runnable() {
            @Override
            public void run(a) {

                // align second
                try {
                    TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
                } catch (InterruptedException e) {
                    if (!ringThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }

                while(! ringThreadToStop) {try {
                        // second data
                        List<Integer> ringItemData = new ArrayList<>();
                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // To avoid too long processing time, step over the scale, check a scale forward;
                        for (int i = 0; i < 2; i++) {
                            // Retrieve the tasks to be scheduled at the current time
                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                            if(tmpData ! =null) { ringItemData.addAll(tmpData); }}// ring trigger
                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + "=" + Arrays.asList(ringItemData) );
                        if (ringItemData.size() > 0) {
                            // do trigger
                            for (int jobId: ringItemData) {
                                // do trigger the task
                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1.null.null.null);
                            }
                            // clearringItemData.clear(); }}catch (Exception e) {
                        if(! ringThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e); }}// next second, align second
                    try {
                        // Sleep for 1 second
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
                    } catch (InterruptedException e) {
                        if(! ringThreadToStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop"); }}); ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();
    }

Copy the code

As can be seen from the above, the ring thread checks the corresponding task from the ringData Map every second and notifies the job executor to execute any task.

Let’s look at the key steps, notifying the Job executor to execute the task.

// do trigger the task
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1.null.null.null);
Copy the code

//JobTriggerPoolHelper

/** * Triggers job *@param jobId
     * @param triggerType
     * @param failRetryCount
     * 			>=0: use this param
     * 			<0: use param from job info config
     * @paramExecutorShardingParam Sharding parameter "index/total" *@param executorParam
     *          null: use job param
     *          not null: cover job param
     */
    public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
        helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
    }
      /** * add trigger */
    public void addTrigger(final int jobId,
                           final TriggerTypeEnum triggerType,
                           final int failRetryCount,
                           final String executorShardingParam,
                           final String executorParam,
                           final String addressList) {

        // choose thread pool
        ThreadPoolExecutor triggerPool_ = fastTriggerPool;
        AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
        if(jobTimeoutCount! =null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
            triggerPool_ = slowTriggerPool;
        }

        // trigger
        triggerPool_.execute(new Runnable() {
            @Override
            public void run(a) {

                long start = System.currentTimeMillis();

                try {
                    // do trigger Triggers the remote execution server to execute the jobXxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); }}}); }Copy the code

//XxlJobTrigger

 /* trigger job * * @param jobId * @param triggerType * @param failRetryCount * >=0: use this param * <0: use param from job info config * @param executorShardingParam * @param executorParam * null: use job param * not null: cover job param * @param addressList * null: use executor addressList * not null: cover */
    public static void trigger(int jobId,
                               TriggerTypeEnum triggerType,
                               int failRetryCount,
                               String executorShardingParam,
                               String executorParam,
                               String addressList) {

        // Load data Loads task data
        XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
        if (jobInfo == null) {
            logger.warn("> > > > > > > > > > > > the trigger fail, jobId invalid, jobId = {}", jobId);
            return;
        }
        if(executorParam ! =null) { jobInfo.setExecutorParam(executorParam); }...if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && group.getRegistryList()! =null && !group.getRegistryList().isEmpty()
                && shardingParam==null) {
            // Fragment broadcast mode, and the fragment parameter is null
            for (int i = 0; i < group.getRegistryList().size(); i++) { processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size()); }}else {
            if (shardingParam == null) {
                shardingParam = new int[] {0.1};
            }
            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]); }}/** * process the task trigger *@param group                     job group, registry list may be empty
     * @param jobInfo
     * @param finalFailRetryCount
     * @param triggerType
     * @param index                     sharding index
     * @param total                     sharding index
     */
    private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){...// 4, trigger remote executor
        ReturnT<String> triggerResult = null;
        if(address ! =null) {
            triggerResult = runExecutor(triggerParam, address);
        } else {
            triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null); }... }/**
     * run executor
     * @param triggerParam
     * @param address
     * @return* /
    public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
        ReturnT<String> runResult = null;
        try {
            // Get the client actuator
            ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
            // Execute the actuatorrunResult = executorBiz.run(triggerParam); }}Copy the code

//XxlJobScheduler

/** * gets the actuator at the given address, if it exists, it is loaded directly from the cache, otherwise it is put into the actuator client pool *@param address
     * @return
     * @throws Exception
     */
    public static ExecutorBiz getExecutorBiz(String address) throws Exception {
        // valid
        if (address==null || address.trim().length()==0) {
            return null;
        }
        // load-cache
        address = address.trim();
        ExecutorBiz executorBiz = executorBizRepository.get(address);
        if(executorBiz ! =null) {
            return executorBiz;
        }
        // set-cache
        executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());
        executorBizRepository.put(address, executorBiz);
        return executorBiz;
    }

Copy the code

//ExecutorBizClient

 @Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
    }
Copy the code

As you can see from the above, the task execution is, in fact, selected from the task group according to the routing policy, the execution server, notifying the task server to execute the task. For sharding tasks, all task servers are notified.

Take a look at the task executor

actuator

On the client side, we usually enable the following configuration

@Configuration
public class XxlJobConfig {

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor(a) {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        returnxxlJobSpringExecutor; }... }Copy the code

Look at the initialization of XxlJobSpringExecutor

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware.SmartInitializingSingleton.DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);


    / * * * * /
    // start
    @Override
    public void afterSingletonsInstantiated(a) {

        // init JobHandler Repository for older versions
        /*initJobHandlerRepository(applicationContext); * /

        // init JobHandler Repository (for method)
        initJobHandlerMethodRepository(applicationContext);

        // refresh GlueFactory to refresh the GLUE factory type
        GlueFactory.refreshInstance(1);

        // super start
        try {
            super.start();
        } catch (Exception e) {
            throw newRuntimeException(e); }}... }Copy the code

XxlJobSpringExecutor initializes the JobHandler annotation method level job into the executor.

//XxlJobSpringExecutor

 /** * Initializes method-level job *@param applicationContext
     */
    private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
        if (applicationContext == null) {
            return;
        }
        // init job handler from method
        String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false.true);
        for (String beanDefinitionName : beanDefinitionNames) {
            Object bean = applicationContext.getBean(beanDefinitionName);
            / / referred to: org. Springframework. Context. Event. EventListenerMethodProcessor. ProcessBean
            // Scan the XxlJob annotation method for the bean in the application context
            Map<Method, XxlJob> annotatedMethods = null;
            try {
                annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                        new MethodIntrospector.MetadataLookup<XxlJob>() {
                            @Override
                            public XxlJob inspect(Method method) {
                                returnAnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class); }}); }catch (Throwable ex) {
                logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
            }
            if (annotatedMethods==null || annotatedMethods.isEmpty()) {
                continue;
            }

            for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
                Method method = methodXxlJobEntry.getKey();
                XxlJob xxlJob = methodXxlJobEntry.getValue();
                if (xxlJob == null) {
                    continue;
                }
                / / the name of the job
                String name = xxlJob.value();
                if (name.trim().length() == 0) {
                    throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "].");
                }
                //check Whether there is a job with the same name
                if(loadJobHandler(name) ! =null) {
                    throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
                }

                // Execute method, which can have only one parameter and is of the Strng type
                if(! (method.getParameterTypes().length ==1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
                    throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "]." +
                            "The correct method format like \" public ReturnT<String> execute(String param) \" .");
                }
                // Check the return value type
                if(! method.getReturnType().isAssignableFrom(ReturnT.class)) {throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "]." +
                            "The correct method format like \" public ReturnT<String> execute(String param) \" .");
                }
                method.setAccessible(true);

                // init and destory, init and destory
                Method initMethod = null;
                Method destroyMethod = null;
                if (xxlJob.init().trim().length() > 0) {
                    try {
                        initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                        initMethod.setAccessible(true);
                    } catch (NoSuchMethodException e) {
                        throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "]."); }}if (xxlJob.destroy().trim().length() > 0) {
                    try {
                        destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                        destroyMethod.setAccessible(true);
                    } catch (NoSuchMethodException e) {
                        throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "]."); }}// registry jobHandler registers the jobhandler
                registJobHandler(name, newMethodJobHandler(bean, method, initMethod, destroyMethod)); }}}// ---------------------- job handler repository ----------------------
    private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();

    /** * Register job processor *@param name
     * @param jobHandler
     * @return* /
    public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
        logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
        return jobHandlerRepository.put(name, jobHandler);
    }

Copy the code

ConcurrentMap<String, IJobHandler>;

Take a quick look at method-level tasks

public class MethodJobHandler extends IJobHandler {

    /** * job */
    private final Object target;
    /** * job method */
    private final Method method;
    /** * Job initialization method */
    private Method initMethod;
    /** * job destruction method */
    privateMethod destroyMethod; . }Copy the code

You can see above that the MethodJobHandler wraps the task object, method-level job, and job initialization and destruction.

Then come the actual startup of the card actuator

//XxlJobExecutor

public class XxlJobExecutor  {
    /** * Start the job executor *@throws Exception
     */
    public void start(a) throws Exception {

        // init logpath
        XxlJobFileAppender.initLogPath(logPath);

        Init invoker, admin-client initializes the job console client
        initAdminBizList(adminAddresses, accessToken);


        // init JobLogFileCleanThread Start the log clearing thread
        JobLogFileCleanThread.getInstance().start(logRetentionDays);

        // init TriggerCallbackThread starts the TriggerCallbackThread
        TriggerCallbackThread.getInstance().start();

        // init executor-server Starts the executor serverinitEmbedServer(address, ip, port, appname, accessToken); }... }Copy the code

Start XxlJobExecutor, start the job console client, start the log cleaning thread, start the trigger callback thread (task execution, finish the task, put the callback thread to the callback thread, the callback thread polls the callback queue, And notify the client), and start a Netty embedded HTTP Server to receive scheduling notifications from console scheduling. //XxlJobExecutor

 private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

        // fill ip port
        port = port>0? port: NetUtil.findAvailablePort(9999); ip = (ip! =null&&ip.trim().length()>0)? ip: IpUtil.getIp();// generate address
        if (address==null || address.trim().length()==0) {
            String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address: default use address to registry, otherwise use IP :port if address is null
            address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
        }

        // start
        embedServer = new EmbedServer();
        embedServer.start(address, port, appname, accessToken);
    }
Copy the code

//EmbedServer

/ * * * Copy from: https://github.com/xuxueli/xxl-rpc * * netty based embedded client@author xuxueli 2020-04-11 21:25
 */
public class EmbedServer {
    private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);

    private ExecutorBiz executorBiz;
    private Thread thread;

    /** * Start the actuator server *@param address
     * @param port
     * @param appname
     * @param accessToken
     */
    public void start(final String address, final int port, final String appname, final String accessToken) {
        executorBiz = new ExecutorBizImpl();
        thread = new Thread(new Runnable() {

            @Override
            public void run(a) {

                // param
                EventLoopGroup bossGroup = new NioEventLoopGroup();
                EventLoopGroup workerGroup = new NioEventLoopGroup();
                ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                        0.200.60L,
                        TimeUnit.SECONDS,
                        new LinkedBlockingQueue<Runnable>(2000),
                        new ThreadFactory() {
                            @Override
                            public Thread newThread(Runnable r) {
                                return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-"+ r.hashCode()); }},new RejectedExecutionHandler() {
                            @Override
                            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!"); }});try {
                    // start server Starts the Netty HTTP server
                    ServerBootstrap bootstrap = new ServerBootstrap();
                    bootstrap.group(bossGroup, workerGroup)
                            .channel(NioServerSocketChannel.class)
                            .childHandler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                public void initChannel(SocketChannel channel) throws Exception {
                                    channel.pipeline()
                                            .addLast(new IdleStateHandler(0.0.30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                            .addLast(new HttpServerCodec())
                                            .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                            .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                                }
                            })
                            .childOption(ChannelOption.SO_KEEPALIVE, true);

                    // bind
                    ChannelFuture future = bootstrap.bind(port).sync();

                    logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

                    // start registry, start the task execution registry thread
                    startRegistry(appname, address);

                    // wait util stop
                    future.channel().closeFuture().sync();

                } catch (InterruptedException e) {
                    if (e instanceof InterruptedException) {
                        logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
                    } else {
                        logger.error(">>>>>>>>>>> xxl-job remoting server error.", e); }}finally {
                    // stop
                    try {
                        workerGroup.shutdownGracefully();
                        bossGroup.shutdownGracefully();
                    } catch(Exception e) { logger.error(e.getMessage(), e); }}}}); thread.setDaemon(true);	// daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
        thread.start();
    }

Copy the code

The key see EmbedHttpServerHandler

//EmbedHttpServerHandler

/**
     * netty_http
     *
     * Copy from : https://github.com/xuxueli/xxl-rpc
     *
     * @author xuxueli 2015-11-24 22:25:15
     */
    public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
        private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);

        private ExecutorBiz executorBiz;
        private String accessToken;
        private ThreadPoolExecutor bizThreadPool;
        public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {
            this.executorBiz = executorBiz;
            this.accessToken = accessToken;
            this.bizThreadPool = bizThreadPool;
        }

        @Override
        protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {

            // request parse
            //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
            String requestData = msg.content().toString(CharsetUtil.UTF_8);
            String uri = msg.uri();
            HttpMethod httpMethod = msg.method();
            boolean keepAlive = HttpUtil.isKeepAlive(msg);
            String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);

            // invoke
            bizThreadPool.execute(new Runnable() {
                @Override
                public void run(a) {
                    // Do invoke handles HTTP requests
                    Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);

                    // to json
                    String responseJson = GsonTool.toJson(responseObj);

                    // write responsewriteResponse(ctx, keepAlive, responseJson); }}); }/** * Handle admin HTTP requests *@param httpMethod
         * @param uri
         * @param requestData
         * @param accessTokenReq
         * @return* /
        private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {

            // valid
            if(HttpMethod.POST ! = httpMethod) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
            }
            if (uri==null || uri.trim().length()==0) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
            }
            if(accessToken! =null
                    && accessToken.trim().length()>0
                    && !accessToken.equals(accessTokenReq)) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
            }

            // services mapping
            try {
                if ("/beat".equals(uri)) {
                    / / the heart
                    return executorBiz.beat();
                } else if ("/idleBeat".equals(uri)) {
                    // Idle heartbeat
                    IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                    return executorBiz.idleBeat(idleBeatParam);
                } else if ("/run".equals(uri)) {
                    / / perform the job
                    TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                    return executorBiz.run(triggerParam);
                } else if ("/kill".equals(uri)) {
                    //kill job
                    KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                    return executorBiz.kill(killParam);
                } else if ("/log".equals(uri)) {
                    LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                    return executorBiz.log(logParam);
                } else {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found."); }}catch (Exception e) {
                logger.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:"+ ThrowableUtil.toString(e)); }}... }Copy the code

As can be seen from the above, embedded HTTPserver mainly provides heartbeat, idle heartbeat, job, killjob, and other REST operations.

ExecutorBizImpl //ExecutorBizImpl

 public ReturnT<String> run(TriggerParam triggerParam) {
        // Load old: jobHandler + jobThread loads the job thread from the job executorJobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread! =null? jobThread.getHandler():null;
        String removeOldReason = null;

        // Valid: jobHandler + jobThread Verifies the job thread and job processor
        GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
        if (GlueTypeEnum.BEAN == glueTypeEnum) {

            // new jobHandler creates an executor handler
            IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

            // valid old jobThread
            if(jobThread! =null&& jobHandler ! = newJobHandler) {// change handler, need kill old thread
                removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                jobHandler = newJobHandler;
                if (jobHandler == null) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found."); }}}else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

            // valid old jobThread
            if(jobThread ! =null &&
                    !(jobThread.getHandler() instanceof GlueJobHandler
                        && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change handler or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                try {
                    IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                    jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    return newReturnT<String>(ReturnT.FAIL_CODE, e.getMessage()); }}}else if(glueTypeEnum! =null && glueTypeEnum.isScript()) {

            // valid old jobThread
            if(jobThread ! =null &&
                    !(jobThread.getHandler() instanceof ScriptJobHandler
                            && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change script or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                jobHandler = newScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType())); }}else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
        }

        // executor block strategy
        if(jobThread ! =null) {
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
            if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                // discard when running
                if (jobThread.isRunningOrHasQueue()) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); }}else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                // kill running jobThread
                if (jobThread.isRunningOrHasQueue()) {
                    removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                    jobThread = null; }}else {
                // just queue trigger}}// replace thread (new or exists invalid)
        if (jobThread == null) {
            // Register the job to the job executor
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        }

        // Push data to queue, push execution parameters to trigger queue
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
        return pushResult;
    }
Copy the code

You can execute the job from above and actually load the pending job thread, JobThread, from the task set, and push the trigger parameter, the job thread, into the trigger queue.

Now look at the execution parameter TriggerParam

public class TriggerParam implements Serializable{
    private static final long serialVersionUID = 42L;

    private int jobId;

    /** ** execute processor */
    private String executorHandler;
    /** * Execute the argument */
    private String executorParams;
    private String executorBlockStrategy;
    /** * Executor timeout */
    private int executorTimeout;

    /** * Job log ID */
    private long logId;
    private long logDateTime;

    private String glueType;
    private String glueSource;
    private long glueUpdatetime;

    /** * Shard task broadcast index */
    private int broadcastIndex;
    /** * The number of shard tasks broadcast */
    private intbroadcastTotal; . }Copy the code

The execution parameters TriggerParam include execution parameters, execution strategy, glue mode, sharding task broadcast index and number of sharding task broadcast.

Let’s go to the Job thread

public class JobThread extends Thread{
	private static Logger logger = LoggerFactory.getLogger(JobThread.class);

	private int jobId;
	/** * job Processor */
	private IJobHandler handler;
	/** * triggers the task queue */
	private LinkedBlockingQueue<TriggerParam> triggerQueue;
	private Set<Long> triggerLogIdSet;		// avoid repeat trigger for the same TRIGGER_LOG_ID

	private volatile boolean toStop = false;
	private String stopReason;

    private boolean running = false;    // if running job
	private int idleTimes = 0;			// idel times


	public JobThread(int jobId, IJobHandler handler) {
		this.jobId = jobId;
		this.handler = handler;
		this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();
		this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());
	
    /**
     * new trigger to queue
     *
     * @param triggerParam
     * @return* /
	public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
		// avoid repeat
		if (triggerLogIdSet.contains(triggerParam.getLogId())) {
			logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
			return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
		}

		triggerLogIdSet.add(triggerParam.getLogId());
		triggerQueue.add(triggerParam);
        returnReturnT.SUCCESS; . }Copy the code

Where is the trigger for the thread of execution? Back to XxlJobExecutor

//XxlJobExecutor

Register the job executor and start the job thread@param jobId
     * @param handler
     * @param removeOldReason
     * @return* /public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
        JobThread newJobThread = new JobThread(jobId, handler);
        newJobThread.start();
        logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}".new Object[]{jobId, handler});

        JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!
        if(oldJobThread ! =null) {
            oldJobThread.toStop(removeOldReason);
            oldJobThread.interrupt();
        }

        return newJobThread;
    }
Copy the code

Let’s look at starting the Job thread

//JobThread

 @Override
	public void run(a) {

    	// init
    	try {
			handler.init();
		} catch (Throwable e) {
    		logger.error(e.getMessage(), e);
		}

		// execute
		while(! toStop){ running =false;
			idleTimes++;

            TriggerParam triggerParam = null;
            ReturnT<String> executeResult = null;
            try {
				// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
				// Pull the trigger parameter from the job thread trigger queue
				triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
				if(triggerParam! =null) {
					running = true;
					idleTimes = 0;
					// Remove the corresponding log ID from the log ID set
					triggerLogIdSet.remove(triggerParam.getLogId());

					// log filename, like "logPath/yyyy-MM-dd/9999.log"
					String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
					XxlJobFileAppender.contextHolder.set(logFileName);
					ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));

					// execute
					XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());

					if (triggerParam.getExecutorTimeout() > 0) {
						// limit timeout if the timeout period is not specified
						Thread futureThread = null;
						try {
							final TriggerParam triggerParamTmp = triggerParam;
							FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
								@Override
								public ReturnT<String> call(a) throws Exception {
									returnhandler.execute(triggerParamTmp.getExecutorParams()); }}); futureThread =new Thread(futureTask);
							futureThread.start();
                            // Time out to wait for execution results
							executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
						} catch (TimeoutException e) {

							XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
							XxlJobLogger.log(e);

							executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
						} finally{ futureThread.interrupt(); }}else {
						// just execute. Otherwise, the corresponding job is executed immediately
						executeResult = handler.execute(triggerParam.getExecutorParams());
					}

					if (executeResult == null) {
						executeResult = IJobHandler.FAIL;
					} else{ executeResult.setMsg( (executeResult! =null&&executeResult.getMsg()! =null&&executeResult.getMsg().length()>50000)
										?executeResult.getMsg().substring(0.50000).concat("...")
										:executeResult.getMsg());
						executeResult.setContent(null);	// limit obj size
					}
					XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);

				} else {
					// If the trigger actuator list is empty, the corresponding job is removed from the actuator
					if (idleTimes > 30) {
						if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lost
							XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); }}}}catch (Throwable e) {
				if (toStop) {
					XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
				}

				StringWriter stringWriter = new StringWriter();
				e.printStackTrace(new PrintWriter(stringWriter));
				String errorMsg = stringWriter.toString();
				executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);

				XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
			} finally {
                if(triggerParam ! =null) {
                    // callback handler info
                    if(! toStop) {// commonm task execution callback notification
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
                    } else {
                        // is killed
                        ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
                        TriggerCallbackThread.pushCallBack(newHandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); }}}}// Callback trigger request in queue. If the executor is closed, the corresponding job is discarded
		while(triggerQueue ! =null && triggerQueue.size()>0){
			TriggerParam triggerParam = triggerQueue.poll();
			if(triggerParam! =null) {
				// is killed
				ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
				TriggerCallbackThread.pushCallBack(newHandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); }}// destroy
		try {
			handler.destroy();
		} catch (Throwable e) {
			logger.error(e.getMessage(), e);
		}

		logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
	}
Copy the code

As shown in the preceding figure, the process of executing JobThread is to continuously pull trigger parameters from the trigger queue of the job thread. If the trigger parameter times out, the task is executed directly. Otherwise, a FutureTask is created, which times out and waits for the result. At this point, we will control the scheduling task, and the execution of the client executor task analysis, let’s analyze.

conclusion

Task model

The main models include job grouping, job registry, job, job log, job execution report, and job scheduling lock models.

The process of task scheduling

The job scheduler has two threads. One is the scheduleThread, which selects the jobs that need to be scheduled in the next five seconds from the database. If the jobs are triggered, the scheduleThread notifts the job executor to execute the tasks. If there are still ringData tasks scheduled within the ringData Map, the ringThread searches the ringData Map every second and notifys the job executor to execute any task. Task execution, in fact, according to the routing policy, selected from the task group, the execution server, notifying the task server to execute the task. For sharding tasks, all task servers are notified.

XxlJobSpringExecutor initializes the JobHandler annotation method level job into the executor. The rower task set is actually a ConcurrentMap<String, IJobHandler>, MethodJobHandler wrapped task object, method-level job, and job initialization and destruction.

Start XxlJobExecutor, in fact, start the job console client initialization, start the log cleaning thread, start the trigger callback thread (task execution, finish the task, put the callback thread, callback thread polling callback queue, And notify the client), and start a Netty embedded HTTP Server to receive scheduling notifications from console scheduling.

Embedded HTTPserver provides heartbeat, idle heartbeat, job, killjob, and other REST operations.

To execute a job, load the pending job thread (JobThread) from the task set and push the trigger parameter to the trigger queue

Execution parameters TriggerParam, execution parameters, execution policy, glue mode, sharding task broadcast index and number of sharding task broadcast.

If the trigger parameter times out, the task is executed directly. Otherwise, a FutureTask is created and waits for the execution result.

Xxljob disadvantages

Let’s analyze the advantages and disadvantages of XXljob.

advantages

  • Based on HTTP protocol, with cross-platform characteristics;

disadvantages

  • Task scheduling is unified and controlled; Time difference may exist based on HTTP protocol.
  • All task threads are single thread polling scheduling;
  • Logs are cut every hour, which makes troubleshooting difficult

Because xxLJob is lightweight, we do not need to request too much, and it can be satisfied for ordinary applications with a small volume.

Is there anything that can be improved? The answer is yes, control the phone that is only responsible for job tasks, performing server statistics, logging, performing data statistics. Client Quartz from the console pull task scheduling, task execution, the execution machine through the list, the screen is the execution model, and the current execution strategy, something eventually choose a, choose one for more machines, we can use the distributed lock to control, lock, according to the fine-grained synchronous mutex, divided into tasks and task scheduling lock (every time scheduling, According to the timestamp generated), obtain the lock, then perform the task.

The attached

Github xxl-job xxl-job github xxl-job Vt distributed task scheduling framework