>>>> 😜😜😜 Github: 👉 github.com/black-ant CASE Backup: 👉 gitee.com/antblack/ca…

A preface.

We are going to spend three pages looking at xxL-job concepts. This one will focus on the client-side request process.

Use two.

The usage is mainly divided into two parts:

  • Implement execute by inheriting IJobHandler interface
  • Annotate @jobHandler (value = “testJobHandler”) interface
@JobHandler(value = "testJobHandler")
@Component
@Service
public class TestHandleService extends IJobHandler {


    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public ReturnT<String> execute(String s) throws Exception {
        logger.info("param is :{}", s);
        return null; }}Copy the code

Server Configuration

3. The source code

In the Spring family, XxlJobSpringExecutor is used as its executor:

3.1 Initialization Operations

XxlJobSpringExecutor implements notification through ApplicationContextAware, which starts with a method defined in the configuration class:

@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobSpringExecutor xxlJobExecutor(a) {
    // Set AdminAddresses, AppName, IP, etc
}
Copy the code

3.1.1 XxlJobSpringExecutor # start

// C- XxlJobSpringExecutor 
public void start(a) throws Exception {

    // init JobHandler Repository -> 3.2 JobHandler scan
    initJobHandlerRepository(applicationContext);

    // refresh GlueFactory
    // Create a GlueFactory or SpringGlueFactory using type type
    GlueFactory.refreshInstance(1);


    // super start, call xxljobexecutor.start
    super.start();
}


Copy the code

3.1.2 XxlJobExecutor # start section:

// C- XxlJobExecutor
public void start(a) throws Exception {

    // init logpath
    // This includes preparing logBasePath and glueSrcPath and creating the path mkdir
    XxlJobFileAppender.initLogPath(logPath);

    // init invoker, admin-client
    // Step 1: adminaddresses.trim ().split(",") -> Separate clusters with commas
    // Step 2: New XxlRpcReferenceBean to build AdminBiz
    // Step 3: Add to List
      
        adminBizList
      
    initAdminBizList(adminAddresses, accessToken);


    // init JobLogFileCleanThread
    // Create a separate thread to process the log file
    JobLogFileCleanThread.getInstance().start(logRetentionDays);

    // init TriggerCallbackThread -> See below
    TriggerCallbackThread.getInstance().start();

    // init executor-server
    // Provide a default port for 9999 and set the IP
    port = port>0? port: NetUtil.findAvailablePort(9999); ip = (ip! =null&&ip.trim().length()>0)? ip: IpUtil.getIp();// 
    initRpcProvider(ip, port, appName, accessToken);
}
Copy the code

3.1.3 TriggerCallbackThread callback processing

Create: where 2 threads are created:

  • triggerCallbackThread
  • triggerRetryCallbackThread

public void start(a) {


    triggerCallbackThread = new Thread(new Runnable() {

        @Override
        public void run(a) {

            // while loop until end signal
            while(! toStop){// Contains a core object: ReturnT
      
        executeResult
      
                // executeResult = code + msg + content
                HandleCallbackParam callback = getInstance().callBackQueue.take();
                if(callback ! =null) {

                    // callback list param
                    List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();

                    // LinkedBlockingQueue<HandleCallbackParam> callBackQueue
                    // The number of data to be fetched from the queue at one time
                    int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                    callbackParamList.add(callback);

                    // callback, will retry if error
                    if(callbackParamList! =null && callbackParamList.size()>0) {

                        / / call the callbackdoCallback(callbackParamList); }}}// last callback
            List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
            int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
            if(callbackParamList! =null && callbackParamList.size()>0) {
                doCallback(callbackParamList);
            }
            logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory."); }}); triggerCallbackThread.setDaemon(true);
    triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
    triggerCallbackThread.start();


    // retry
    triggerRetryCallbackThread = new Thread(new Runnable() {
        @Override
        public void run(a) {
            while(! toStop){ retryFailCallbackFile(); TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory."); }}); triggerRetryCallbackThread.setDaemon(true);
    triggerRetryCallbackThread.start();

}
Copy the code

doCallback

private void doCallback(List<HandleCallbackParam> callbackParamList){
    boolean callbackRet = false;
    // callback, will retry if error
    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
        // Callback initiates an RPC remote call
        ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
        if(callbackResult! =null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
            callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
            callbackRet = true;
            break;
        } else {
            // Create a callback log
            callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:"+ callbackResult); }}if (!callbackRet) {
        appendFailCallbackFile(callbackParamList);
    }
}
Copy the code

3.1.4 initRpcProvider Registers the service

// C- XxlJobExecutor
private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {

    // init, provider factory
    String address = IpUtil.getIpPort(ip, port);
    
    // Prepare the remote call properties
    Map<String, String> serviceRegistryParam = new HashMap<String, String>();
    serviceRegistryParam.put("appName", appName);
    serviceRegistryParam.put("address", address);

    
    // initialize configuration information, including using NettyHttpServer, serialization methods, and ExecutorServiceRegistry
    // NetEnum includes four common types:
    // - NettyServer / NettyClient
    // - NettyHttpServer / NettyHttpClient
    // - MinaServer / MinaClient
    // - JettyServer / JettyClient
    xxlRpcProviderFactory = new XxlRpcProviderFactory();
    xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);

    // add services -> Add Map
      
        serviceData
      ,>
    xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null.new ExecutorBizImpl());

    // start -> Service registration
    xxlRpcProviderFactory.start();

}
Copy the code

Remote registration Service

// C- 
public void start(a) throws Exception {
   // start server
   serviceAddress = IpUtil.getIpPort(this.ip, port);
   server = netType.serverClass.newInstance();
   
   server.setStartedCallback(new BaseCallback() {    // serviceRegistry started
      @Override
      public void run(a) throws Exception {
         // start registry
         if(serviceRegistryClass ! =null) {
         
            // Build serviceRegistry for registration
            serviceRegistry = serviceRegistryClass.newInstance();
            
            // ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address"));
            ExecutorServiceRegistry -> ExecutorServiceRegistry
            serviceRegistry.start(serviceRegistryParam);
            if (serviceData.size() > 0) { serviceRegistry.registry(serviceData.keySet(), serviceAddress); }}}});// Register to stop the callback
   server.setStopedCallback(new BaseCallback() {     // serviceRegistry stoped
      @Override
      public void run(a) {
         // stop registry
         if(serviceRegistry ! =null) {
            if (serviceData.size() > 0) {
                // Remove and stop
               serviceRegistry.remove(serviceData.keySet(), serviceAddress);
            }
            // registryThread.interrupt();
            // registryThread.join();
            serviceRegistry.stop();
            serviceRegistry = null; }}});// enable thread processing
   server.start(this);
}
Copy the code

Server and ServiceRegistry class structures

3.2 Scanning of annotations

All jobhandlers are scanned in XxlJobSpringExecutor# initJobHandlerRepository

private void initJobHandlerRepository(ApplicationContext applicationContext){

    // Use the applicationContext tool to get all classes annotated with jobHandler
    Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);

    if(serviceBeanMap! =null && serviceBeanMap.size()>0) {
        for (Object serviceBean : serviceBeanMap.values()) {
            // Why do Job tasks need to implement the IJobHandler interface
            if (serviceBean instanceof IJobHandler){
                // Get the name information carried by the annotation
                String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
                IJobHandler handler = (IJobHandler) serviceBean;
                
                // jobHandlerRepository cannot contain jobs with the same name
                if(loadJobHandler(name) ! =null) {
                    throw new RuntimeException("xxl-job jobhandler naming conflicts.");
                }
                
                // Add Handler to ConcurrentMapregistJobHandler(name, handler); }}}}JobHandlerRepository is actually a Map
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
    return jobHandlerRepository.put(name, jobHandler);
}
public static IJobHandler loadJobHandler(String name){
    return jobHandlerRepository.get(name);
}


Copy the code

3.3 Process processing

This section looks at the details of how @JobHandler is currently registered with Admin, and its main processing class is ExecutorRegistryThread

// C- ExecutorServiceRegistry
ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address"));


// C- ExecutorRegistryThread
public void start(final String appName, final String address){

    / / if appName and XxlJobExecutor getAdminBizList () is null, the return directly
    
    registryThread = new Thread(new Runnable() {
        @Override
        public void run(a) {

            // registry is an infinite loop
            while(! toStop) {try {
                    
                    // Prepare the RegistryParam object for registration processing
                    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
                    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                        // Initiate process registration
                        // client.asyncSend(finalAddress, xxlRpcRequest)
                        // TODO: the RPC call of xxl-job is also a large module
                        ReturnT<String> registryResult = adminBiz.registry(registryParam);
                        if(registryResult! =null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                            registryResult = ReturnT.SUCCESS;
                            logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}".new Object[]{registryParam, registryResult});
                            break;
                        } else {
                            logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}".newObject[]{registryParam, registryResult}); }}}catch (Exception e) {
                    // Prints exception logs
                }

                // Wait automatically in the while loop
                TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
            }

            // Registry remove, which contains two main things- AdminBiz adminBiz: XxlJobExecutor.getAdminBizList() - ReturnT<String> registryResult = adminBiz.registryRemove(registryParam); - registryResult = ReturnT.SUCCESS; }}); registryThread.setDaemon(true);
    registryThread.setName("xxl-job, executor ExecutorRegistryThread");
    registryThread.start();
}
Copy the code

3.4 Invocation of JobThread

TODO: I’ll leave the logic of client-side callbacks behind for the next article

conclusion

Xxl-job is a very easy to use task scheduling platform. In addition to the comprehensive functions it provides, the code and structure are very clear.

  • The deployment registers the Service to the server. There are several ways to register the service
  • After execution, the server is called back via callback