“This is the 29th day of my participation in the First Challenge 2022. For details: First Challenge 2022.”

Xxl-job is a hot distributed task scheduling framework. In distributed systems, we often use XXL-Job to do scheduled tasks. Xxl-job is easy to use, naturally supports distributed scenarios, a variety of routing strategies are available, and supports flexible expansion and scaling. In this article, we will analyze xxL-job-core from the source level.

1 XXL – the job description

Xxl-job is a distributed task scheduling platform, whose core design goal is rapid development, simple learning, lightweight and easy to expand.

Its source code is divided into three parts:

  • Xxl-job-admin Indicates the scheduling center
  • Xxl-job-core Indicates the core dependency
  • XXL – job – executor – samples sample

2 xxl-job-core

In this article we will take a look at the xxL-job-core package.

2.1 XxlJobExecutor

First of all, the most core part of core is the executor of xxl-job XxlJobExecutor. Its startup mode is implemented in its subclass. Let’s look at its subclass first.

It has two major subclasses, which are suitable for different scenarios of integration

  • XxlJobSimpleExecutor (simple) requires the task object to be passed to the executor to get the task, which we won’t focus on here

  • XxlJobSpringExecutor (Spring) is integrated with the Spring framework. After the container is initialized, tasks are automatically acquired through the Spring context scan and the executor is started

    public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class); / / start actuator @ Override public void afterSingletonsInstantiated () {/ / processor initialization tasks initJobHandlerMethodRepository(applicationContext); // new SpringGlueFactory(); GlueFactory.refreshInstance(1); Try {// Call super.start(); } catch (Exception e) { throw new RuntimeException(e); } } @Override public void destroy() { super.destroy(); } private void initJobHandlerMethodRepository(ApplicationContext applicationContext) { if (applicationContext == null) {  return; } // init job handler from method String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true); for (String beanDefinitionName : beanDefinitionNames) { Object bean = applicationContext.getBean(beanDefinitionName); Map<Method, XxlJob> annotatedMethods = null; try { annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(), new MethodIntrospector.MetadataLookup<XxlJob>() { @Override public XxlJob inspect(Method method) { return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class); }}); } catch (Throwable ex) { logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex); } if (annotatedMethods==null || annotatedMethods.isEmpty()) { continue; } for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) { Method executeMethod = methodXxlJobEntry.getKey(); XxlJob xxlJob = methodXxlJobEntry.getValue(); if (xxlJob == null) { continue; } String name = xxlJob.value(); if (name.trim().length() == 0) { throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] ."); } if (loadJobHandler(name) ! = null) { throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts."); } executeMethod.setAccessible(true); // init and destory Method initMethod = null; Method destroyMethod = null; if (xxlJob.init().trim().length() > 0) { try { initMethod = bean.getClass().getDeclaredMethod(xxlJob.init()); initMethod.setAccessible(true); } catch (NoSuchMethodException e) { throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] ."); } } if (xxlJob.destroy().trim().length() > 0) { try { destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy()); destroyMethod.setAccessible(true); } catch (NoSuchMethodException e) { throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] ."); } } // registry jobhandler registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));  } } } // ---------------------- applicationContext ---------------------- private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } public static ApplicationContext getApplicationContext() { return applicationContext; }}Copy the code

Let’s look at the functions in the executor XxlJobExecutor

  • Start the start ()

    • Initialization log

    • Initializes the list of RPC callers, initializing a list of calls based on the admin address and token (obtained from the configuration file)

    • Initialize task log file clearing

      The JobLogFileCleanThread thread — is called once a day to clean files

    • Initialize task trigger callback TriggerCallbackThread (feedback scheduling results)

    • Initialize the RPC provider to set address, IP, Port, AppName, accessToken to the execution service

      • Start (Address, Port, AppName, accessToken);

        ExecutorRegistryThread (ExecutorRegistryThread) is used to register appname, address

        The service registration thread invoks AdminBizList, the RPC caller list, and the Executor dispatchers a remote call to admin’s registered heartbeat thread registryThread:

        public class ExecutorRegistryThread { private static Logger logger = LoggerFactory.getLogger(ExecutorRegistryThread.class); private static ExecutorRegistryThread instance = new ExecutorRegistryThread(); public static ExecutorRegistryThread getInstance(){ return instance; } private Thread registryThread; private volatile boolean toStop = false; public void start(final String appname, final String address){ // valid if (appname==null || appname.trim().length()==0) { logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null."); return; } if (XxlJobExecutor.getAdminBizList() == null) { logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null."); return; } registryThread = new Thread(new Runnable() { @Override public void run() { // registry while (! toStop) { try { RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address); for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { ReturnT<String> registryResult = adminBiz.registry(registryParam); if (registryResult! =null && ReturnT.SUCCESS_CODE == registryResult.getCode()) { registryResult = ReturnT.SUCCESS; logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); break; } else { logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); } } catch (Exception e) { logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e); } } } catch (Exception e) { if (! toStop) { logger.error(e.getMessage(), e); } } try { if (! toStop) { TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } } catch (InterruptedException e) { if (! toStop) { logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage()); } } } // registry remove try { RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address); for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { ReturnT<String> registryResult = adminBiz.registryRemove(registryParam); if (registryResult! =null && ReturnT.SUCCESS_CODE == registryResult.getCode()) { registryResult = ReturnT.SUCCESS; logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); break; } else { logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); } } catch (Exception e) { if (! toStop) { logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e); } } } } catch (Exception e) { if (! toStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy."); }}); registryThread.setDaemon(true); registryThread.setName("xxl-job, executor ExecutorRegistryThread"); registryThread.start(); } public void toStop() { toStop = true; // interrupt and wait if (registryThread ! = null) { registryThread.interrupt(); try { registryThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); }}}}Copy the code
  • Destroy destroy

  • RPC’s caller initAdminBizList (called at startup)

  • RPC provider initEmbedServer (called at startup)

  • Register the destruction task handler (after the Spring container is initialized, after the method marked with annotations is scanned, – call register task handler, called before the executor is initialized)

  • Register the destruction task thread (the execution task starts when the executor is called)

It covers the startup and destruction methods of actuators

public class XxlJobExecutor  {
    private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);
​
    // ---------------------- param ----------------------
    private String adminAddresses;
    private String accessToken;
    private String appname;
    private String address;
    private String ip;
    private int port;
    private String logPath;
    private int logRetentionDays;
​
    public void setAdminAddresses(String adminAddresses) {
        this.adminAddresses = adminAddresses;
    }
    public void setAccessToken(String accessToken) {
        this.accessToken = accessToken;
    }
    public void setAppname(String appname) {
        this.appname = appname;
    }
    public void setAddress(String address) {
        this.address = address;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public void setPort(int port) {
        this.port = port;
    }
    public void setLogPath(String logPath) {
        this.logPath = logPath;
    }
    public void setLogRetentionDays(int logRetentionDays) {
        this.logRetentionDays = logRetentionDays;
    }
​
​
    // ---------------------- start + stop ----------------------
    public void start() throws Exception {
​
        // 初始化日志
        XxlJobFileAppender.initLogPath(logPath);
​
        // 初始化rpc调用方,根据admin地址和token初始化一个调用列表(admin地址和token通过配置文件获取)
        initAdminBizList(adminAddresses, accessToken);
​
​
        // 初始化任务日志文件清理
        JobLogFileCleanThread.getInstance().start(logRetentionDays);
​
        // 初始化任务触发器回调TriggerCallbackThread
        TriggerCallbackThread.getInstance().start();
​
        // init executor-server
        initEmbedServer(address, ip, port, appname, accessToken);
    }
    public void destroy(){
        // destory executor-server
        stopEmbedServer();
​
        // destory jobThreadRepository
        if (jobThreadRepository.size() > 0) {
            for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) {
                JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job.");
                // wait for job thread push result to callback queue
                if (oldJobThread != null) {
                    try {
                        oldJobThread.join();
                    } catch (InterruptedException e) {
                        logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e);
                    }
                }
            }
            jobThreadRepository.clear();
        }
        jobHandlerRepository.clear();
​
​
        // destory JobLogFileCleanThread
        JobLogFileCleanThread.getInstance().toStop();
​
        // destory TriggerCallbackThread
        TriggerCallbackThread.getInstance().toStop();
​
    }
​
​
    // ---------------------- admin-client (rpc invoker) ----------------------
    private static List<AdminBiz> adminBizList;
    private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
        if (adminAddresses!=null && adminAddresses.trim().length()>0) {
            for (String address: adminAddresses.trim().split(",")) {
                if (address!=null && address.trim().length()>0) {
​
                    AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
​
                    if (adminBizList == null) {
                        adminBizList = new ArrayList<AdminBiz>();
                    }
                    adminBizList.add(adminBiz);
                }
            }
        }
    }
    public static List<AdminBiz> getAdminBizList(){
        return adminBizList;
    }
​
    // ---------------------- executor-server (rpc provider) ----------------------
    private EmbedServer embedServer = null;
​
    private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
​
        // fill ip port
        port = port>0?port: NetUtil.findAvailablePort(9999);
        ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
​
        // generate address
        if (address==null || address.trim().length()==0) {
            String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
            address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
        }
​
        // accessToken
        if (accessToken==null || accessToken.trim().length()==0) {
            logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
        }
​
        // start
        embedServer = new EmbedServer();
        embedServer.start(address, port, appname, accessToken);
    }
​
    private void stopEmbedServer() {
        // stop provider factory
        if (embedServer != null) {
            try {
                embedServer.stop();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    }
​
​
    // ---------------------- job handler repository ----------------------
    private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
    public static IJobHandler loadJobHandler(String name){
        return jobHandlerRepository.get(name);
    }
    public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
        logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
        return jobHandlerRepository.put(name, jobHandler);
    }
​
​
    // ---------------------- job thread repository ----------------------
    private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
    public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
        JobThread newJobThread = new JobThread(jobId, handler);
        newJobThread.start();
        logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
​
        JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);  // putIfAbsent | oh my god, map's put method return the old value!!!
        if (oldJobThread != null) {
            oldJobThread.toStop(removeOldReason);
            oldJobThread.interrupt();
        }
​
        return newJobThread;
    }
    public static JobThread removeJobThread(int jobId, String removeOldReason){
        JobThread oldJobThread = jobThreadRepository.remove(jobId);
        if (oldJobThread != null) {
            oldJobThread.toStop(removeOldReason);
            oldJobThread.interrupt();
​
            return oldJobThread;
        }
        return null;
    }
    public static JobThread loadJobThread(int jobId){
        JobThread jobThread = jobThreadRepository.get(jobId);
        return jobThread;
    }
​
}
Copy the code

2.2 EmbedServer

Build Http embedded services through Netty, which is the core of RPC communication between admin and executor, and is also a classic Netty application scenario


public class EmbedServer {
    private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);

    private ExecutorBiz executorBiz;
    private Thread thread;

    public void start(final String address, final int port, final String appname, final String accessToken) {
        executorBiz = new ExecutorBizImpl();
        thread = new Thread(new Runnable() {

            @Override
            public void run(a) {

                // param
                EventLoopGroup bossGroup = new NioEventLoopGroup();
                EventLoopGroup workerGroup = new NioEventLoopGroup();
                ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                        0.200.60L,
                        TimeUnit.SECONDS,
                        new LinkedBlockingQueue<Runnable>(2000),
                        new ThreadFactory() {
                            @Override
                            public Thread newThread(Runnable r) {
                                return new Thread(r, "xxl-job, EmbedServer bizThreadPool-"+ r.hashCode()); }},new RejectedExecutionHandler() {
                            @Override
                            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!"); }});try {
                    // start server
                    ServerBootstrap bootstrap = new ServerBootstrap();
                    bootstrap.group(bossGroup, workerGroup)
                            .channel(NioServerSocketChannel.class)
                            .childHandler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                public void initChannel(SocketChannel channel) throws Exception {
                                    channel.pipeline()
                                            .addLast(new IdleStateHandler(0.0.30 * 3, TimeUnit.SECONDS))
                                            .addLast(new HttpServerCodec())
                                            .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  
                                            .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                                }
                            })
                            .childOption(ChannelOption.SO_KEEPALIVE, true);

                    // bind
                    ChannelFuture future = bootstrap.bind(port).sync();

                    logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

                    // start registry
                    startRegistry(appname, address);

                    // wait util stop
                    future.channel().closeFuture().sync();

                } catch (InterruptedException e) {
                    if (e instanceof InterruptedException) {
                        logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
                    } else {
                        logger.error(">>>>>>>>>>> xxl-job remoting server error.", e); }}finally {
                    // stop
                    try {
                        workerGroup.shutdownGracefully();
                        bossGroup.shutdownGracefully();
                    } catch(Exception e) { logger.error(e.getMessage(), e); }}}}); thread.setDaemon(true);	
        thread.start();
    }

    public void stop(a) throws Exception {
        // destroy server thread
        if(thread! =null && thread.isAlive()) {
            thread.interrupt();
        }

        // stop registry
        stopRegistry();
        logger.info(">>>>>>>>>>> xxl-job remoting server destroy success.");
    }


    // ---------------------- registry ----------------------

    /** * netty_http */
    public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
        private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);

        private ExecutorBiz executorBiz;
        private String accessToken;
        private ThreadPoolExecutor bizThreadPool;
        public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {
            this.executorBiz = executorBiz;
            this.accessToken = accessToken;
            this.bizThreadPool = bizThreadPool;
        }

        @Override
        protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {

            // request parse
            //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
            String requestData = msg.content().toString(CharsetUtil.UTF_8);
            String uri = msg.uri();
            HttpMethod httpMethod = msg.method();
            boolean keepAlive = HttpUtil.isKeepAlive(msg);
            String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);

            // invoke
            bizThreadPool.execute(new Runnable() {
                @Override
                public void run(a) {
                    // do invoke
                    Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);

                    // to json
                    String responseJson = GsonTool.toJson(responseObj);

                    // write responsewriteResponse(ctx, keepAlive, responseJson); }}); }private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {

            // valid
            if(HttpMethod.POST ! = httpMethod) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
            }
            if (uri==null || uri.trim().length()==0) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
            }
            if(accessToken! =null
                    && accessToken.trim().length()>0
                    && !accessToken.equals(accessTokenReq)) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
            }

            // services mapping
            try {
                if ("/beat".equals(uri)) {
                    return executorBiz.beat();
                } else if ("/idleBeat".equals(uri)) {
                    IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                    return executorBiz.idleBeat(idleBeatParam);
                } else if ("/run".equals(uri)) {
                    TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                    return executorBiz.run(triggerParam);
                } else if ("/kill".equals(uri)) {
                    KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                    return executorBiz.kill(killParam);
                } else if ("/log".equals(uri)) {
                    LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                    return executorBiz.log(logParam);
                } else {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found."); }}catch (Exception e) {
                logger.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:"+ ThrowableUtil.toString(e)); }}/** * write response */
        private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
            // write response
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));   // Unpooled.wrappedBuffer(responseJson)
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");       // HttpHeaderValues.TEXT_PLAIN.toString()
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
            if (keepAlive) {
                response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
            }
            ctx.writeAndFlush(response);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            logger.error(">>>>>>>>>>> xxl-job provider netty_http server caught exception", cause);
            ctx.close();
        }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                ctx.channel().close();      // beat 3N, close if idle
                logger.debug(">>>>>>>>>>> xxl-job provider netty_http server close an idle channel.");
            } else {
                super.userEventTriggered(ctx, evt); }}}// ---------------------- registry ----------------------

    public void startRegistry(final String appname, final String address) {
        ExecutorRegistryThread.getInstance().start(appname, address);
    }

    public void stopRegistry(a) { ExecutorRegistryThread.getInstance().toStop(); }}Copy the code

2.3 ExecutorBizImpl

Used to execute execution requests sent from the admin dispatch center, and the Run method in the class is launched when EmbedServer receives a Run request from the dispatch center. Then load the task thread and register the task through the corresponding handler

public class ExecutorBizImpl implements ExecutorBiz { private static Logger logger = LoggerFactory.getLogger(ExecutorBizImpl.class); @Override public ReturnT<String> beat() { return ReturnT.SUCCESS; } @Override public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam) { // isRunningOrHasQueue boolean isRunningOrHasQueue = false; JobThread jobThread = XxlJobExecutor.loadJobThread(idleBeatParam.getJobId()); if (jobThread ! = null && jobThread.isRunningOrHasQueue()) { isRunningOrHasQueue = true; } if (isRunningOrHasQueue) { return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread is running or has trigger queue."); } return ReturnT.SUCCESS; } @override public <String> run(TriggerParam TriggerParam) {// load old: jobHandler + jobThread JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread! =null? jobThread.getHandler():null; String removeOldReason = null; // Valid: jobHandler + jobThread GlueTypeEnum GlueTypeEnum = GlueTypeEnum. Match (triggerParam.getGlueType()); if (GlueTypeEnum.BEAN == glueTypeEnum) { // new jobhandler IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); // valid old jobThread if (jobThread! =null && jobHandler ! = newJobHandler) { // change handler, need kill old thread removeOldReason = "change jobhandler or glue type, and terminate the old job thread."; jobThread = null; jobHandler = null; } // valid handler if (jobHandler == null) { jobHandler = newJobHandler; if (jobHandler == null) { return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found."); } } } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { if (jobThread ! = null && ! (jobThread.getHandler() instanceof GlueJobHandler && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { // change handler or gluesource updated, need kill old thread removeOldReason = "change job source or glue type, and terminate the old job thread."; jobThread = null; jobHandler = null; } if (jobHandler == null) { try { IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource()); jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime()); } catch (Exception e) { logger.error(e.getMessage(), e); return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage()); } } } else if (glueTypeEnum! =null && glueTypeEnum.isScript()) { if (jobThread ! = null && ! (jobThread.getHandler() instanceof ScriptJobHandler && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { // change script or gluesource updated, need kill old thread removeOldReason = "change job source or glue type, and terminate the old job thread."; jobThread = null; jobHandler = null; } // valid handler if (jobHandler == null) { jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType())); } } else { return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid."); } // executor block strategy if (jobThread ! = null) { ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { // discard when running if (jobThread isRunningOrHasQueue () {return new ReturnT < String > (ReturnT FAIL_CODE, "block strategy effect: "+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); } } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { // kill running jobThread if (jobThread isRunningOrHasQueue ()) {removeOldReason = "block strategy effect: " + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle(); jobThread = null; } } else { // just queue trigger } } // replace thread (new or exists invalid) if (jobThread == null) { jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); } // push data to queue ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam); return pushResult; } @Override public ReturnT<String> kill(KillParam killParam) { // kill handlerThread, and create new one JobThread jobThread = XxlJobExecutor.loadJobThread(killParam.getJobId()); if (jobThread ! = null) { XxlJobExecutor.removeJobThread(killParam.getJobId(), "scheduling center kill job."); return ReturnT.SUCCESS; } return new ReturnT<String>(ReturnT.SUCCESS_CODE, "job thread already killed."); } @Override public ReturnT<LogResult> log(LogParam logParam) { // log filename: logPath/yyyy-MM-dd/9999.log String logFileName = XxlJobFileAppender.makeLogFileName(new Date(logParam.getLogDateTim()), logParam.getLogId()); LogResult logResult = XxlJobFileAppender.readLog(logFileName, logParam.getFromLineNum()); return new ReturnT<LogResult>(logResult); }}Copy the code

2.4 JobThread

Task thread, through which the corresponding task processing is performed

Steps:

  • Initialize handler

  • perform

    • Get the queue triggerQueue
    • Remove the triggerLogIdSet record
    • Set the XxlJobContext content
    • Execution processor
    • Obtaining execution results
    • Deleting a task thread
  • Call TriggerCallbackThread to report the results

  • Destroy handler

public class JobThread extends Thread{
	private static Logger logger = LoggerFactory.getLogger(JobThread.class);

	private int jobId;
	private IJobHandler handler;
	private LinkedBlockingQueue<TriggerParam> triggerQueue;
	private Set<Long> triggerLogIdSet;		// avoid repeat trigger for the same TRIGGER_LOG_ID

	private volatile boolean toStop = false;
	private String stopReason;

    private boolean running = false;    // if running job
	private int idleTimes = 0;			// idel times


	public JobThread(int jobId, IJobHandler handler) {
		this.jobId = jobId;
		this.handler = handler;
		this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();
		this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());

		// assign job thread name
		this.setName("xxl-job, JobThread-"+jobId+"-"+System.currentTimeMillis());
	}
	public IJobHandler getHandler(a) {
		return handler;
	}

    /**
     * new trigger to queue
     *
     * @param triggerParam
     * @return* /
	public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
		// avoid repeat
		if (triggerLogIdSet.contains(triggerParam.getLogId())) {
			logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
			return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
		}

		triggerLogIdSet.add(triggerParam.getLogId());
		triggerQueue.add(triggerParam);
        return ReturnT.SUCCESS;
	}

    /**
     * kill job thread
     *
     * @param stopReason
     */
	public void toStop(String stopReason) {
		/** * Thread.interrupt only terminates a Thread's blocking state (wait, Join, sleep). InterruptedException is thrown when the Thread is blocking, but it does not terminate the running Thread itself. * Therefore, it should be noted that the thread should be completely destroyed by sharing variables; * /
		this.toStop = true;
		this.stopReason = stopReason;
	}

    /**
     * is running job
     * @return* /
    public boolean isRunningOrHasQueue(a) {
        return running || triggerQueue.size()>0;
    }

    @Override
	public void run(a) {

    	// init
    	try {
			handler.init();
		} catch (Throwable e) {
    		logger.error(e.getMessage(), e);
		}

		// execute
		while(! toStop){ running =false;
			idleTimes++;

            TriggerParam triggerParam = null;
            try {
				// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
				triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
				if(triggerParam! =null) {
					running = true;
					idleTimes = 0;
					triggerLogIdSet.remove(triggerParam.getLogId());

					// log filename, like "logPath/yyyy-MM-dd/9999.log"
					String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
					XxlJobContext xxlJobContext = new XxlJobContext(
							triggerParam.getJobId(),
							triggerParam.getExecutorParams(),
							logFileName,
							triggerParam.getBroadcastIndex(),
							triggerParam.getBroadcastTotal());

					// init job context
					XxlJobContext.setXxlJobContext(xxlJobContext);

					// execute
					XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());

					if (triggerParam.getExecutorTimeout() > 0) {
						// limit timeout
						Thread futureThread = null;
						try {
							FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
								@Override
								public Boolean call(a) throws Exception {

									// init job context
									XxlJobContext.setXxlJobContext(xxlJobContext);

									handler.execute();
									return true; }}); futureThread =new Thread(futureTask);
							futureThread.start();

							Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
						} catch (TimeoutException e) {

							XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
							XxlJobHelper.log(e);

							// handle result
							XxlJobHelper.handleTimeout("job execute timeout ");
						} finally{ futureThread.interrupt(); }}else {
						// just execute
						handler.execute();
					}

					// valid execute handle data
					if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
						XxlJobHelper.handleFail("job handle result lost.");
					} else{ String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg(); tempHandleMsg = (tempHandleMsg! =null&&tempHandleMsg.length()>50000)
								?tempHandleMsg.substring(0.50000).concat("...")
								:tempHandleMsg;
						XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
					}
					XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
							+ XxlJobContext.getXxlJobContext().getHandleCode()
							+ ", handleMsg = "
							+ XxlJobContext.getXxlJobContext().getHandleMsg()
					);

				} else {
					if (idleTimes > 30) {
						if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lost
							XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); }}}}catch (Throwable e) {
				if (toStop) {
					XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
				}

				// handle result
				StringWriter stringWriter = new StringWriter();
				e.printStackTrace(new PrintWriter(stringWriter));
				String errorMsg = stringWriter.toString();

				XxlJobHelper.handleFail(errorMsg);

				XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
			} finally {
                if(triggerParam ! =null) {
                    // callback handler info
                    if(! toStop) {// commonm
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                        		triggerParam.getLogId(),
								triggerParam.getLogDateTime(),
								XxlJobContext.getXxlJobContext().getHandleCode(),
								XxlJobContext.getXxlJobContext().getHandleMsg() )
						);
                    } else {
                        // is killed
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                        		triggerParam.getLogId(),
								triggerParam.getLogDateTime(),
								XxlJobContext.HANDLE_CODE_FAIL,
								stopReason + " [job running, killed]")); }}}}// callback trigger request in queue
		while(triggerQueue ! =null && triggerQueue.size()>0){
			TriggerParam triggerParam = triggerQueue.poll();
			if(triggerParam! =null) {
				// is killed
				TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
						triggerParam.getLogId(),
						triggerParam.getLogDateTime(),
						XxlJobContext.HANDLE_CODE_FAIL,
						stopReason + " [job not executed, in the job queue, killed.]")); }}// destroy
		try {
			handler.destroy();
		} catch (Throwable e) {
			logger.error(e.getMessage(), e);
		}

		logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); }}Copy the code

3 summary

Xxl-job-core is an executor. xxl-job-core is an executor. xxl-job-core is an executor. xxl-job-core

In the Spring integration environment:

  1. The Spring container is initialized

  2. After the container initialization is complete

  3. Register handlers by scanning the Spring container for annotated task handlers

  4. Starting actuator

    • Initialization log

    • Initialize the list of RPC callers (scheduling centers)

    • Initialize task log file clearing

    • The JobLogFileCleanThread thread is called and file cleaning is called once a day

    • Initialize task trigger callback TriggerCallbackThread (feedback scheduling results)

    • Initialize the RPC provider to set address, IP, Port, AppName, accessToken to the execution service

      • Create an execution server (Netty HTTP service)
      • Create a thread that sends a heartbeat to the dispatch center
  5. Task scheduling center admin Schedules services

  6. Start the task execution thread

  7. Through the task thread, the corresponding processor is called to execute the task

  8. Feedback the scheduling result to the task scheduling center

  9. Destroy the task execution thread