An overview,
1.1. What is task scheduling
Consider a solution for the following business scenarios:
- An e-commerce platform needs to issue a batch of coupons at 10 am, 3 PM and 8 PM every day
- A bank system needs to send an SMS reminder three days before the credit card payment is due
- A financial system needs to settle the financial data of the previous day at 0:10 a.m. every day and make statistical summary
The above scenarios are the problems to be solved by task scheduling, which is a process to automatically complete a specific task and execute the task at a specified time.
The Scheduled task annotation @Scheduled is also available in Spring. All we need to do is put an annotation in the business and an @enablesCheduling annotation in the startup class to complete the task scheduling function.
@Scheduled(cron = "0/20 * * * * ? ") // Execute every 20 seconds
public void doWork(a){
//doSomething
}
Copy the code
1.2. Distributed scheduling appears
It feels like Spring has provided us with this annotation to complete the task scheduling function, which seems to have solved the problem perfectly, why do we need to distribute? The main reasons are as follows:
- Machine processing limit: originally need to process 10,000 orders within 1 minute, but now need to process 100,000 orders within 1 minute; It used to take 1 hour for a statistic, but now it takes 10 minutes for the business side to get the statistic. You might say, you can also multithread, single-machine, multi-process. Indeed, multi-threaded parallel processing can improve the processing efficiency per unit time, but the capacity of a single machine is limited after all (mainly CPU, memory and disk), there will always be a single machine can not handle the situation.
- High availability: The standalone version of the fixed task scheduling can only run on one machine, if the program or system is abnormal, the function will be unavailable. Although it can be stable enough in a stand-alone program, there is always a chance of encountering non-program-induced failure, which is unacceptable for the core functionality of a system.
- Prevent repeated execution: In single-machine mode, scheduled tasks are fine. However, when multiple services are deployed and each service has a scheduled task, only one scheduled task can be executed at the same time without reasonable control. In this case, the results of scheduled execution may be chaotic and erroneous.
1.3, the Elastic – the Job
Elastice-job is a distributed scheduling solution, which is open-source by Dangdang. It consists of two independent sub-projects, elastice-Job-Lite and elastice-job-cloud. Using elastice-Job, you can quickly implement distributed task scheduling. Elastic- Github address of the Job. Its main functions are:
-
Distributed scheduling coordination
In a distributed environment, tasks can be executed according to specified scheduling policies and multiple instances of the same task can be avoided.
-
Rich scheduling policies:
Perform scheduled tasks based on mature Scheduled task job framework Quartz Cron expressions.
-
Elastic expansion and shrinkage
When an instance is added to the cluster, it should be able to be elected to perform tasks; When an instance is reduced from the cluster, the tasks it performs can be moved to another example.
-
Failure to transfer
If a task fails, it will be transferred to another instance.
-
Missed task retriggering
If a job fails to be executed due to some reason, the system automatically records the incorrectly executed job and triggers it after the next job is completed.
-
Support parallel scheduling
Supports task sharding. Task sharding means that a task is divided into several smaller tasks to be executed simultaneously in multiple instances.
-
Job fragment consistency
When tasks are sharded, ensure that the same shard has only one execution instance in the distributed environment.
-
Support job life cycle operations
You can dynamically start and stop tasks.
-
Rich job types
Simple, DataFlow, and Script are supported
1.4. Start ZooKeeper
- Unzip the package
- CFG to the conf directory and change the name to zoo.cfg.
- Go to the bin directory and start the startup. CMD file using the 1 command line.
1.4. Start the ZooKeeper GUI
- Decompression.
- Go to the 1Build directory and find the JAR package.
- Run the java-jar command to specify the name of the JAR package
Elastic-job Quickstart
Environment requirements of pexe-job 1:
- The JDK requires 1.7 or above break-even
- Maven requires version 3.0.4 and above
- Zookeeper must be 3.4.6 or later
2.1. Environment construction
Install and run ZooKeeper
Create a Maven project and import the dependencies
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
Copy the code
Writing task class
public class XiaoLinJob implements SimpleJob {
// Write the task class
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("Scheduled task begins"); }}Copy the code
Writing configuration classes
public class JobDemo {
public static void main(String[] args) {
new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
}
private static CoordinatorRegistryCenter createRegistryCenter(a) {
// Set the zK address and task group name
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181"."elastic-job-demo");
zookeeperConfiguration.setSessionTimeoutMilliseconds(1000);
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
regCenter.init();
return regCenter;
}
private static LiteJobConfiguration createJobConfiguration(a) {
// Define the job core configuration
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob"."0/1 * * * *?".1).build();
// Define the SIMPLE configuration
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, XiaoLinJob.class.getCanonicalName());
// Define Lite job root configuration
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
returnsimpleJobRootConfig; }}Copy the code
2.2, tests,
- When only one is started, the task is scheduled according to the CORN expression.
- When two machines are started on 1, the new one will continue to perform scheduled tasks and the old one will stop.
3. SpringBoot integrates Elastic-Job
3.1. Introduce dependencies
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3. RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
Copy the code
3.2. Write configuration files
application.yaml
Since the configuration center address is not fixed, we can not write it in code, we need to write it in the configuration file. Create a new configuration file:
elasticjob:
zookeeper-url: localhost:2181
group-name: elastic-job-group
Copy the code
Zookeeper Registry configuration class
// Registry configuration class
@Configuration
public class RegistryCenterConfig {
@Bean(initMethod = "init")
// Get the url and namespace of the registry from the configuration file
public CoordinatorRegistryCenter coordinatorRegistryCenter(
@Value("${elasticjob.zookeeper-url}") String zookeeperUrl,
@Value("${elasticjob.group-name}") String namespace){
// ZK configuration
ZookeeperConfiguration zookeeperConfiguration =
new ZookeeperConfiguration(zookeeperUrl,namespace);
// Set the timeout period
zookeeperConfiguration.setMaxSleepTimeMilliseconds(10000000);
// Create a registry
ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
returnzookeeperRegistryCenter; }}Copy the code
Configuration class for task scheduling
@Configuration
public class JobConfig {
@Autowired
XiaoLinJob xiaoLinJob;
@Autowired
private CoordinatorRegistryCenter registryCenter;
private static LiteJobConfiguration createJobConfiguration(
finalClass<? Extends SimpleJob> jobClass, // The name of the taskfinalString cron, // cron expressionfinal intShardingTotalCount, // The number of shardsfinalString shardingItemParameters // shardingItemParameters){
JobCoreConfiguration.Builder jobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getSimpleName(),cron,shardingTotalCount);
if(! StringUtils.isEmpty(shardingItemParameters)){ jobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters); } SimpleJobConfiguration simpleJobConfig =new SimpleJobConfiguration(jobCoreConfigurationBuilder.build(), XiaoLinJob.class.getCanonicalName());
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();
return simpleJobRootConfig;
}
@Bean(initMethod = "init")
public SpringJobScheduler initSimpleElasticJob(a){
SpringJobScheduler springJobScheduler = new SpringJobScheduler(xiaoLinJob,registryCenter,createJobConfiguration(XiaoLinJob.class,"0/3 * * * *?".1.null));
returnspringJobScheduler; }}Copy the code
Custom task class
@Component
public class XiaoLinJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("= = = = = = = = = = = ="); }}Copy the code
3.3, tests,
4. Small cases
4.1 standalone version
4.1.1 Description of Requirements
Some columns of data in the database need to be backed up. After the data is backed up, modify the state of the data to mark that the data has been backed up.
4.1.2 Creating a database
SET FOREIGN_KEY_CHECKS=0;
-- ----------------------------
-- Table structure for t_file_custom
-- ----------------------------
DROP TABLE IF EXISTS `t_file_custom`;
CREATE TABLE `t_file_custom` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`content` varchar(255) DEFAULT NULL,
`type` varchar(255) DEFAULT NULL,
`backedUp` tinyint(4) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of t_file_custom
-- ----------------------------
INSERT INTO `t_file_custom` VALUES ('1'.'file 1'.Content of the '1'.'text'.'1');
INSERT INTO `t_file_custom` VALUES ('2'.'file 2'.Content of the '2'.'text'.'1');
INSERT INTO `t_file_custom` VALUES ('3'.'file 3'.Content of the '3'.'text'.'1');
INSERT INTO `t_file_custom` VALUES ('4'.'file 4'.Content of '4'.'image'.'1');
INSERT INTO `t_file_custom` VALUES ('5'.'file 5'.'content 5'.'image'.'1');
INSERT INTO `t_file_custom` VALUES ('6'.'file 6'.'content 6'.'text'.'1');
INSERT INTO `t_file_custom` VALUES ('7'.'file 6'.'content 7'.'radio'.'1');
INSERT INTO `t_file_custom` VALUES ('8'.'file 8'.8 'content'.'radio'.'1');
INSERT INTO `t_file_custom` VALUES ('9'.'file 9'.9 'content'.'vedio'.'1');
INSERT INTO `t_file_custom` VALUES ('10'.'file 10'.10 'content'.'vedio'.'1');
INSERT INTO `t_file_custom` VALUES ('11'.'file 11'.11 'content'.'vedio'.'1');
INSERT INTO `t_file_custom` VALUES ('12'.'file 12'.'content of 12'.'vedio'.'1');
INSERT INTO `t_file_custom` VALUES ('13'.'file 13'.13 'content'.'image'.'1');
INSERT INTO `t_file_custom` VALUES ('14'.'file 14'.'content 14'.'text'.'1');
INSERT INTO `t_file_custom` VALUES ('15'.'file 15'.Content of the '15'.'image'.'1');
INSERT INTO `t_file_custom` VALUES ('16'.'file of 16'.16 'content'.'text'.'1');
INSERT INTO `t_file_custom` VALUES ('17'.'file, 17'.'content, 17'.'radio'.'1');
INSERT INTO `t_file_custom` VALUES ('18'.'file 18'.'content 18'.'image'.'1');
INSERT INTO `t_file_custom` VALUES ('the'.'file 19'.'content 19'.'radio'.'1');
INSERT INTO `t_file_custom` VALUES ('20'.'file 20'.'content 20'.'vedio'.'1');
Copy the code
4.1.3, Druid&MyBatis
4.1.3.1. Adding dependencies
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.2.0</version>
</dependency>
<! - mysql driver - >
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
Copy the code
4.1.3.2 Integrated database
spring:
datasource:
url: jdbc:mysql://localhost:3306/elastic-job-demo? serverTimezone=GMT%2B8
driverClassName: com.mysql.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: root
password: admin
Copy the code
4.1.4. Add an Entity Class
@Data
public class FileCustom {
// Unique identifier
private Long id;
/ / file name
private String name;
// File type
private String type;
// File contents
private String content;
// Whether backup is available
private Boolean backedUp = false;
public FileCustom(a){}
public FileCustom(Long id, String name, String type, String content){
this.id = id;
this.name = name;
this.type = type;
this.content = content; }}Copy the code
4.1.5. Add a Task Class
@Slf4j
@Component
public class FileCustomElasticJob implements SimpleJob {
@Autowired
FileCopyMapper fileCopyMapper;
@Override
public void execute(ShardingContext shardingContext) {
doWork();
}
private void doWork(a) {
List<FileCustom> fileCustoms = fileCopyMapper.selectAll();
if (fileCustoms.size() == 0){
log.info("Backup completed");
return;
}
log.info("The number of files to be backed up is:"+fileCustoms.size());
for(FileCustom fileCustom : fileCustoms) { backUpFile(fileCustom); }}private void backUpFile(FileCustom fileCustom) {
try {
Thread.sleep(1000);
log.info("Execute backup file:"+fileCustom);
fileCopyMapper.backUpFile(fileCustom.getId());
} catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code
4.1.6 Adding Mapper
@Mapper
public interface FileCopyMapper {
@Select("select * from t_file_custom where backedUp = 0")
List<FileCustom> selectAll(a);
@Update("update t_file_custom set backedUp = 1 where id = #{id}")
void backUpFile(Long id);
}
Copy the code
4.1.7. Add the task scheduling configuration
@Bean(initMethod = "init")
public SpringJobScheduler initFileCustomElasticJob(FileCustomElasticJob fileCustomElasticJob){
SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileCustomElasticJob,registryCenter,createJobConfiguration(XiaoLinJob.class,"0/3 * * * *?".1.null));
return springJobScheduler;
}
Copy the code
4.1.8 Existing problems
For high availability, we will cluster the project to ensure that one of them dies and the other one continues to work. However, in the case of cluster, the scheduling task only runs on one machine. If the scheduling of a single task is time-consuming and consumes resources, the consumption of this machine is relatively large.
But at this point, other machines are idle, so how do you make good use of the other machines in the cluster and make tasks run faster? In this case, Elastic-Job provides the task fragmentation function.
4.2 cluster version
4.2.1. Sharding concept
Job sharding refers to the distributed execution of tasks. A task needs to be divided into multiple independent tasks, and distributed application instances execute one or several distributed tasks respectively.
For example, if two servers are deployed to back up data on a single server, each server runs an application instance. For quick execution, the task can be divided into four pieces, two for each application instance. The logic of job traversal data should be as follows: Example 1 looks for text and image files to perform backup, and Example 2 looks for radio and Vedio files to perform backup.
If the number of application instances increases to 4 due to server expansion, the logic of job traversal data is as follows: Four instances process files of text, image, radio, and video types respectively.
Through reasonable task fragmentation, the effect of task parallel processing can be achieved. Its benefits are as follows:
- Decoupling between shard items and service: Elastic-Job does not directly provide data processing functions. The framework only distributes shard items to each running Job server. Developers need to handle the relationship between shard items and real data by themselves.
- Maximize resource utilization: If the number of shard items is larger than the number of data on the server, preferably greater than a multiple of the server, the job will make proper use of distributed resources and dynamically allocate shard items. For example, if three servers are divided into 10 pieces, the result of the fragment item is server A=0, 1, and 2. Server B=3, 4, and 5. Server C=6, 7, 8, 9. If server C crashes, server A is 0, 1, 2, 3, and 4. Server B=5, 6, 7, 8, 9. Maximize the use of existing resources to improve throughput without losing shard entries.
4.2.2 Configuration Class modification
If we want to change the standalone version to the cluster version, we first need to increase the number of shards and shard parameters in the task configuration class.
@Bean(initMethod = "init")
public SpringJobScheduler initFileCustomElasticJob(FileCustomElasticJob fileCustomElasticJob){
SpringJobScheduler springJobScheduler = new
// The first parameter represents the custom task class, the second parameter is the corn expression, the third parameter is the number of shards, the fourth parameter is the name of the shard, the first shard is used to query the type of test, and so on
SpringJobScheduler(fileCustomElasticJob,registryCenter,createJobConfiguration(XiaoLinJob.class,"0/3 * * * *?".4."0=text,1=image,2=radio,3=vedio"));
return springJobScheduler;
}
Copy the code
4.2.3 Add job fragmentation logic
@Slf4j
@Component
public class FileCustomElasticJob implements SimpleJob {
@Autowired
FileCopyMapper fileCopyMapper;
@Override
public void execute(ShardingContext shardingContext) {
// Get the type of the specified shard
doWork(shardingContext.getShardingParameter());
}
private void doWork(String fileType) {
List<FileCustom> fileCustoms = fileCopyMapper.selectByType(fileType);
if (fileCustoms.size() == 0){
log.info("Backup completed");
return;
}
log.info("The type of files to be backed up is:"+fileType+"Number of files is:"+fileCustoms.size());
for(FileCustom fileCustom : fileCustoms) { backUpFile(fileCustom); }}private void backUpFile(FileCustom fileCustom) {
try {
Thread.sleep(2000);
log.info("Execute backup file:"+fileCustom);
fileCopyMapper.backUpFile(fileCustom.getId());
} catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code
4.2.4 Mapper class modification
@Mapper
public interface FileCopyMapper {
@Select("select * from t_file_custom where backedUp = 0")
List<FileCustom> selectAll(a);
@Update("update t_file_custom set backedUp = 1 where id = #{id}")
void backUpFile(Long id);
@Select("select * from t_file_custom where backedUp = 0 and type = #{fileType}")
List<FileCustom> selectByType(String fileType);
}
Copy the code
4.2.5, test,
4.2.5.1 a Machine
A machine starts four threads and runs out.
4.2.5.2 Four machines
When the four machines start, each machine is assigned a thread to query and back up a type of data.
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — I’m line — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — –
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — I’m line — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — –
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — I’m line — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — –