Affectionate only spring court month, especially for leaving according to falling flowers.
An overview of the
Scheduled tasks are used in the project and multiple service instances are deployed. Therefore, repeated scheduled tasks need to be solved. That is, each scheduled task is executed on only one node at a time. Common open source solutions, such as Elastic-Job, XXl-Job, Quartz, Saturn, Opencron, Antares, etc. The final decision is to use elastic- Job. The highlights of Elastice-Job are as follows:
- It is based on the Quartz timed task framework and therefore has most of the features of Quartz
- Use ZooKeeper for coordination, scheduling center, and more lightweight
- Sharding of tasks is supported
- Supports flexible capacity expansion and horizontal expansion. When a task is running again, the system checks the current number of servers and shards the task again. After the sharding is complete, the task is continued
- Failover and fault-tolerant processing: When a scheduling server breaks down or disconnects from ZooKeeper, it immediately stops operations and then searches for other idle scheduling servers to run the remaining tasks
- Provides an o&M interface to manage jobs and registries
However, in actual development, it is found that elastice-Job does not support sharding for dynamically added scheduled tasks. That is, in the multi-instance scenario, if a task is dynamically added to an instance, the task will always run on this node. If you need to run on another instance, you need to call the other instance interface with the same parameters. Reference :elastic-job: dynamically adds tasks. There is a problem with adding tasks dynamically under Baidu + Google. However, the fragments dynamically added by the main test of the building are sometimes good and sometimes bad, and as long as the task is registered in ZooKeeper, the task will be automatically initialized when restarting. (For a description of dynamic tasks, please refer to the description linked above, not to explain too much here).
To solve
Along the idea of Yin da, the task nodes are centrally managed, no matter which node dynamic task is registered, it is necessary to forward the request to other nodes for initialization operation, so as to ensure the normal execution of multi-node sharding tasks.
The code is as follows:
/** * Enable task listening. When a task is added, the data in zK will be added and the task will be initialized on other nodes
public void monitorJobRegister(a) {
CuratorFramework client = zookeeperRegistryCenter.getClient();
@SuppressWarnings("resource")
PathChildrenCache childrenCache = new PathChildrenCache(client, "/".true);
PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
ChildData data = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
String config = new String(client.getData().forPath(data.getPath() + "/config"));
Job job = JsonUtils.toBean(Job.class, config);
Object bean = null;
// If the bean fails to get, add the task
try {
bean = ctx.getBean("SpringJobScheduler" + job.getJobName());
} catch (BeansException e) {
logger.error("ERROR NO BEAN,CREATE BEAN SpringJobScheduler" + job.getJobName());
}
if (Objects.isNull(bean)) {
addJob(job);
}
break;
default:
break; }}}; childrenCache.getListenable().addListener(childrenCacheListener);try {
// https://blog.csdn.net/u010402202/article/details/79581575
childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
} catch(Exception e) { e.printStackTrace(); }}Copy the code
test
Test dynamically adds scheduled tasks and supports fragment failover.
- downloadelastic-job-spring-boot-starteruse
maven
The commandinstall
To the local - create
demo-elastic-job
The project directory structure is as follows:
Demo - elastic - job ├ ─ ─ MVNW ├ ─ ─ MVNW. CMD ├ ─ ─ the SRC │ ├ ─ ─ the main │ │ ├ ─ ─ Java │ │ │ └ ─ ─ com │ │ │ └ ─ ─ example │ │ │ └ ─ ─ demo │ │ │ ├ ─ ─ job │ │ │ │ ├ ─ ─ DynamicJob. Java │ │ │ │ └ ─ ─ TestJob. Java │ │ │ └ ─ ─ DemoApplication. Java │ │ └ ─ ─ resources │ │ ├ ─ ─ application. Yml │ │ └ ─ ─ application - dev. Yml │ └ ─ ─ the test │ └ ─ ─ Java │ └ ─ ─ com │ └ ─ ─ example │ └ ─ ─ demo │ └ ─ ─ DemoApplicationTests. Java ├ ─ ─ pom. XML └ ─ ─ demo. On imlCopy the code
pom.xml
<?xml version="1.0" encoding="UTF-8"? >
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1. RELEASE</version>
<relativePath/> <! -- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1 - the SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.cxytiandi</groupId>
<artifactId>elastic-job-spring-boot-starter</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Copy the code
DemoApplication.java
package com.example.demo;
import com.cxytiandi.elasticjob.annotation.EnableElasticJob;
import com.cxytiandi.elasticjob.dynamic.service.JobService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@EnableElasticJob
@ComponentScan(basePackages = {"com.cxytiandi"."com.example.demo"})
public class DemoApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Autowired
private JobService jobService;
@Override
public void run(String... args) throws Exception {
// Simulate the initial read database add task
// Job job1 = new Job();
// job1.setJobName("job1");
// job1.setCron("0/10 * * * * ? ");
// job1.setJobType("SIMPLE");
// job1.setJobClass("com.example.demo.job.DynamicJob");
// job1.setShardingItemParameters("");
// job1.setShardingTotalCount(2);
// jobService.addJob(job1);
// Job job2 = new Job();
// job2.setJobName("job2");
// job2.setCron("0/10 * * * * ? ");
// job2.setJobType("SIMPLE");
// job2.setJobClass("com.example.demo.job.DynamicJob");
// job2.setShardingItemParameters("0=A,1=B");
// job2.setShardingTotalCount(2);
// jobService.addJob(job2);}}Copy the code
TestJob.java
package com.example.demo.job;
import com.cxytiandi.elasticjob.annotation.ElasticJobConf;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
/**
* Created by zhenglongfei on 2019/7/22
*
* @VERSION1.0 * /
@Component
@Slf4j
@ElasticJobConf(name = "dayJob", cron = "0/10 * * * *?", shardingTotalCount = 2,
shardingItemParameters = "0=AAAA,1=BBBB", description = "Simple task", failover = true)
public class TestJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("TestJob name: [{}], number of slices: [{}], param= [{}]", shardingContext.getJobName(), shardingContext.getShardingTotalCount(), shardingContext.getShardingParameter()); }}Copy the code
DynamicJob.java
package com.example.demo.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* Created by zhenglongfei on 2019/7/24
*
* @VERSION1.0 * /
@Component
@Slf4j
public class DynamicJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
switch (shardingContext.getShardingItem()) {
case 0:
log.info("【0】 is running");
break;
case 1:
log.info("【1】 is running");
break; }}}Copy the code
application.yml
elastic:
job:
zk:
serverLists: 172.2566.137.: 2181
namespace: demo_test
server:
port: 8082
spring:
redis:
host: 127.0. 01.
port: 6379
Copy the code
The test results
Start two projects with ports 8081 and 8082, using the REST API to dynamically register tasks.
- job
http://localhost:8081/job post parameters are as follows:
{
"jobName": "DynamicJob01"."cron": "0/3 * * * *?"."jobType": "SIMPLE"."jobClass": "com.example.demo.job.DynamicJob"."jobParameter": "test"."shardingTotalCount": 2."shardingItemParameters": "0=AAAA,1=BBBB"
}
Copy the code
The code download
- github:demo-elastic-job
- github:elastic-job-spring-boot-starter
🙂🙂🙂 Follow wechat public account Java dry goods irregularly share dry goods information
Reference links:
- Elastic-Job Dynamically adds tasks
- Extice-job: dynamically adds tasks
- Curator’s PathChildrenCache those pits