Affectionate only spring court month, especially for leaving according to falling flowers.

https://p1-jj.byteimg.com/tos-cn-i-t2oaga2asx/gold-user-assets/2019/9/19/16d48304045990f4~tplv-t2oaga2asx-image.image

An overview of the

Scheduled tasks are used in the project and multiple service instances are deployed. Therefore, repeated scheduled tasks need to be solved. That is, each scheduled task is executed on only one node at a time. Common open source solutions, such as Elastic-Job, XXl-Job, Quartz, Saturn, Opencron, Antares, etc. The final decision is to use elastic- Job. The highlights of Elastice-Job are as follows:

  1. It is based on the Quartz timed task framework and therefore has most of the features of Quartz
  2. Use ZooKeeper for coordination, scheduling center, and more lightweight
  3. Sharding of tasks is supported
  4. Supports flexible capacity expansion and horizontal expansion. When a task is running again, the system checks the current number of servers and shards the task again. After the sharding is complete, the task is continued
  5. Failover and fault-tolerant processing: When a scheduling server breaks down or disconnects from ZooKeeper, it immediately stops operations and then searches for other idle scheduling servers to run the remaining tasks
  6. Provides an o&M interface to manage jobs and registries

However, in actual development, it is found that elastice-Job does not support sharding for dynamically added scheduled tasks. That is, in the multi-instance scenario, if a task is dynamically added to an instance, the task will always run on this node. If you need to run on another instance, you need to call the other instance interface with the same parameters. Reference :elastic-job: dynamically adds tasks. There is a problem with adding tasks dynamically under Baidu + Google. However, the fragments dynamically added by the main test of the building are sometimes good and sometimes bad, and as long as the task is registered in ZooKeeper, the task will be automatically initialized when restarting. (For a description of dynamic tasks, please refer to the description linked above, not to explain too much here).

To solve

Along the idea of Yin da, the task nodes are centrally managed, no matter which node dynamic task is registered, it is necessary to forward the request to other nodes for initialization operation, so as to ensure the normal execution of multi-node sharding tasks.

The code is as follows:

/** * Enable task listening. When a task is added, the data in zK will be added and the task will be initialized on other nodes
    public void monitorJobRegister(a) {
        CuratorFramework client = zookeeperRegistryCenter.getClient();
        @SuppressWarnings("resource")
        PathChildrenCache childrenCache = new PathChildrenCache(client, "/".true);
        PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                ChildData data = event.getData();
                switch (event.getType()) {
                    case CHILD_ADDED:
                        String config = new String(client.getData().forPath(data.getPath() + "/config"));
                        Job job = JsonUtils.toBean(Job.class, config);
                        Object bean = null;
                        // If the bean fails to get, add the task
                        try {
                            bean = ctx.getBean("SpringJobScheduler" + job.getJobName());
                        } catch (BeansException e) {
                            logger.error("ERROR NO BEAN,CREATE BEAN SpringJobScheduler" + job.getJobName());
                        }
                        if (Objects.isNull(bean)) {
                            addJob(job);
                        }
                        break;
                    default:
                        break; }}}; childrenCache.getListenable().addListener(childrenCacheListener);try {
            // https://blog.csdn.net/u010402202/article/details/79581575
            childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        } catch(Exception e) { e.printStackTrace(); }}Copy the code

test

Test dynamically adds scheduled tasks and supports fragment failover.

  1. downloadelastic-job-spring-boot-starterusemavenThe commandinstallTo the local
  2. createdemo-elastic-jobThe project directory structure is as follows:
Demo - elastic - job ├ ─ ─ MVNW ├ ─ ─ MVNW. CMD ├ ─ ─ the SRC │ ├ ─ ─ the main │ │ ├ ─ ─ Java │ │ │ └ ─ ─ com │ │ │ └ ─ ─ example │ │ │ └ ─ ─ demo │ │ │ ├ ─ ─ job │ │ │ │ ├ ─ ─ DynamicJob. Java │ │ │ │ └ ─ ─ TestJob. Java │ │ │ └ ─ ─ DemoApplication. Java │ │ └ ─ ─ resources │ │ ├ ─ ─ application. Yml │ │ └ ─ ─ application - dev. Yml │ └ ─ ─ the test │ └ ─ ─ Java │ └ ─ ─ com │ └ ─ ─ example │ └ ─ ─ demo │ └ ─ ─ DemoApplicationTests. Java ├ ─ ─ pom. XML └ ─ ─ demo. On imlCopy the code

pom.xml

<?xml version="1.0" encoding="UTF-8"? >
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.1. RELEASE</version>
        <relativePath/> <! -- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1 - the SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.cxytiandi</groupId>
            <artifactId>elastic-job-spring-boot-starter</artifactId>
            <version>1.0.4</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Copy the code

DemoApplication.java

package com.example.demo;

import com.cxytiandi.elasticjob.annotation.EnableElasticJob;
import com.cxytiandi.elasticjob.dynamic.service.JobService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@EnableElasticJob
@ComponentScan(basePackages = {"com.cxytiandi"."com.example.demo"})
public class DemoApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Autowired
    private JobService jobService;

    @Override
    public void run(String... args) throws Exception {
        // Simulate the initial read database add task
// Job job1 = new Job();
// job1.setJobName("job1");
// job1.setCron("0/10 * * * * ? ");
// job1.setJobType("SIMPLE");
// job1.setJobClass("com.example.demo.job.DynamicJob");
// job1.setShardingItemParameters("");
// job1.setShardingTotalCount(2);
// jobService.addJob(job1);
// Job job2 = new Job();
// job2.setJobName("job2");
// job2.setCron("0/10 * * * * ? ");
// job2.setJobType("SIMPLE");
// job2.setJobClass("com.example.demo.job.DynamicJob");
// job2.setShardingItemParameters("0=A,1=B");
// job2.setShardingTotalCount(2);
// jobService.addJob(job2);}}Copy the code

TestJob.java

package com.example.demo.job;

import com.cxytiandi.elasticjob.annotation.ElasticJobConf;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

/**
 * Created by zhenglongfei on 2019/7/22
 *
 * @VERSION1.0 * /
@Component
@Slf4j
@ElasticJobConf(name = "dayJob", cron = "0/10 * * * *?", shardingTotalCount = 2,
        shardingItemParameters = "0=AAAA,1=BBBB", description = "Simple task", failover = true)
public class TestJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        log.info("TestJob name: [{}], number of slices: [{}], param= [{}]", shardingContext.getJobName(), shardingContext.getShardingTotalCount(), shardingContext.getShardingParameter()); }}Copy the code

DynamicJob.java

package com.example.demo.job;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * Created by zhenglongfei on 2019/7/24
 *
 * @VERSION1.0 * /
@Component
@Slf4j
public class DynamicJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {


        switch (shardingContext.getShardingItem()) {
            case 0:
                log.info("【0】 is running");
                break;
            case 1:
                log.info("【1】 is running");
                break; }}}Copy the code

application.yml

elastic:
  job:
    zk:
      serverLists: 172.2566.137.: 2181
      namespace: demo_test
server:
  port: 8082
spring:
  redis:
    host: 127.0. 01.
    port: 6379
Copy the code

The test results

Start two projects with ports 8081 and 8082, using the REST API to dynamically register tasks.

  • job
    https://p1-jj.byteimg.com/tos-cn-i-t2oaga2asx/gold-user-assets/2019/9/19/16d48304044a93fb~tplv-t2oaga2asx-image.image

http://localhost:8081/job post parameters are as follows:

{
  "jobName": "DynamicJob01"."cron": "0/3 * * * *?"."jobType": "SIMPLE"."jobClass": "com.example.demo.job.DynamicJob"."jobParameter": "test"."shardingTotalCount": 2."shardingItemParameters": "0=AAAA,1=BBBB"
}
Copy the code

https://p1-jj.byteimg.com/tos-cn-i-t2oaga2asx/gold-user-assets/2019/9/19/16d483a9c926830e~tplv-t2oaga2asx-image.image

The code download

  • github:demo-elastic-job
  • github:elastic-job-spring-boot-starter

https://p1-jj.byteimg.com/tos-cn-i-t2oaga2asx/gold-user-assets/2019/9/18/16d42fc88345bad5~tplv-t2oaga2asx-image.image

🙂🙂🙂 Follow wechat public account Java dry goods irregularly share dry goods information

Reference links:

  • Elastic-Job Dynamically adds tasks
  • Extice-job: dynamically adds tasks
  • Curator’s PathChildrenCache those pits