Description :ElasticJob
ElaticJob is a framework for scheduling tasks. It provides a distributed scheduling task framework with the functions of job fragmentation strategy, scheduling task catch-up, job running status monitoring, job listener, self-diagnosis and repair, etc. Note: this section is an introduction based on SPringElaticJob
Preface:
How can we realize distributed timing task If let us write a distributed task regularly, how should we use, also is very simple, actually we know of Java memory or memory directly, is open in the current task or process of a piece of memory space, then the other process is unable to access the memory space. To implement distributed timed tasks, we need to solve this problem first. Zookeeper: This is a good choice
Solution a: Let’s learn about what is a Zookeeper, this is a storage node framework, and can undertake node listens, perceptual node from top to bottom line, after connected to the Zk, we can to node in the output of the Zk, node monitoring, child nodes data monitoring, node data monitoring, child nodes data monitoring, such as operation, through the brief introduction, then, shall we To understand that ZK can meet the node online monitoring and election operations;
The dynamic monitoring of the nodes can be addressed by the ZK, then we have to analyse, if timed tasks, you will need to have a cron expression, and ElasticJob shard operation, so he is how to solve
Solution 2: How to implement Sharding, when we define a scheduled task configuration, we can add description Sharding to the parameter Sharding, that is to say, it will be executed in several pieces. It has three rules: fair, hash IP, custom sharding strategy, implementation of JobShardingStrategy interface and implementation of Sharding method, interface method parameters are job server IP list and sharding policy options, sharding policy options include job name, The total number of fragments, sequence number of fragments, and the mapping table of personalized parameters can be customized as required.
How to write configuration:
server:
port: 8088
elasticjob: This is our own non-framework configuration of the area at initialization time
zookeeper-url: localhost:2181 #zookeeper- Url link port
group-name: elastic-job-group # This is the name of zooker's visual interface creation project
spring:
datasource: # data source configuration
url: jdbc:mysql://localhost:3306/elastic-job-demo? serverTimezone=GMT%2B8
driverClassName: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: root
password: admin
Copy the code
A: how to use a simple scheduled task
Step 1 :Maven dependencies (Spring)
< the dependency > < groupId > com. Dangdang < / groupId > < artifactId > elastic - job - lite - spring < / artifactId > < version > 2.1.5 < / version > </dependency>Copy the code
Step 2: Define a Zk connector
The point of this step is to connect to zK, a registry that Elasticjob knows about and can use freely, which is a demonstration of the core technology for implementing sharding
/ / simple
@Configuration
public class CoordinatorRegistryConfig {
@Bean(initMethod = "init")
private static CoordinatorRegistryCenter createRegisterCenter(@Value("${elasticjob.zookeeper-url}") String zookeeperUrl, @Value("${elasticjob.group-name}") String group){
// Set the zK address and task group name
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(zookeeperUrl, group);
zookeeperConfiguration.setSessionTimeoutMilliseconds(100);
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
returnregCenter; }}Copy the code
Step 3: Define a handler simple
ShardingContext ShardingContext ShardingContext ShardingContext ShardingContext ShardingContext ShardingContext ShardingContext
Sharding: If write 3 follows the default method, then according to the most basic sharding principle, suppose you have three deployments, then each task will be evenly distributed among three servers: This parameter corresponds to Sharding if Sharding =3, shardingParam will need to configure the three sharding processing information, such as 0=” China “,1=” city “,2=” people “, and only one of these parameters will be retrieved when the three parameters are assigned to different execution programs.
JobName: jobName FileCustomElasticJob is actually the name of the class that executes the job
TaskId: job ID. FileCustomElasticJob@-@0,1,2,3@-@READY@[email protected]@-@20348 specifies the IP address and how many pieces to execute
ShardingTotalCount: indicates the total number of fragments. 4. 0123 JobParam: Obviously, a normal parameter can be obtained at specified time
@Component
public class MySimpleJob implements SimpleJob {
// The task scheduling method is executed at regular intervals
@Override
public void execute(ShardingContext shardingContext) {
// Scheduled task logic
System.out.println("Execution time :"+newDate()); }}/ / data flow
@Component
public class FileCustomElasticJob implements SimpleJob {
@Autowired
private FileCustomMapper fileCustomMapper;
@Override
public void execute(ShardingContext shardingContext) {
doWork(shardingContext.getShardingParameter());
}
private void doWork(String fileType){
List<FileCustom> fileList = fileCustomMapper.selecByType(fileType);
System.out.println("Type :"+fileType+", file, number of backups needed :"+fileList.size());
for(FileCustom fileCustom:fileList){ backUpFile(fileCustom); }}private void backUpFile(FileCustom fileCustom){
try {
// Simulate the backup action
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Execute file backup ====>"+fileCustom);
fileCustomMapper.changeState(fileCustom.getId(),1); }}Copy the code
Step 4: Define how this simple program should be executed
Explanation: CreateJobConfiguration: Create a configuration center for a Job class-bytecode how often cron is executed shardingTotalCount How many slices are partitioned DataFlowType is a scheduled task for the data stream. If you need to set more things, you can add a field. For example, I wrote JobParam 123
IniDataflowElasticJob: This class initializes the execution policy of the fileDataFlowJob scheduled task. Which registry to connect to, how to do it, how to shard, what are the shard parameters, whether the data flow is specified, etc., and then put back into the Spring container for management
/** * Created by wolfcode-lanxw */
@Configuration
public class ElasticJobConfig {
@Autowired
private CoordinatorRegistryCenter registryCenter;
@Bean(initMethod = "init")
public SpringJobScheduler iniDataflowElasticJob(FileDataflowJob fileDataflowJob){
SpringJobScheduler springJobScheduler = new SpringJobScheduler(
fileDataflowJob,/ / business class
registryCenter,/ / they are classes
createJobConfiguration/ / configuration class
(FileDataflowJob.class,"0/5 * * * *?".2."0=text,1=image,2=radio,3=vedio".true));
return springJobScheduler;
}
private static LiteJobConfiguration createJobConfiguration(final Class<? extends ElasticJob> jobClass,
final String cron,
final int shardingTotalCount,
final String shardingItemParameters,
boolean dataflowType) {
// Define the job core configuration
JobCoreConfiguration.Builder jobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getSimpleName(), cron, shardingTotalCount);
if(! StringUtils.isEmpty(shardingItemParameters)){ jobCoreConfigurationBuilder.jobParameter("123").shardingItemParameters(shardingItemParameters); / / conditions
}
JobTypeConfiguration jobConfig = null;
if(dataflowType){
jobConfig = new DataflowJobConfiguration(jobCoreConfigurationBuilder.build(),jobClass.getCanonicalName(),true);
}else{
// Define the SIMPLE configuration
jobConfig = new SimpleJobConfiguration(jobCoreConfigurationBuilder.build(), jobClass.getCanonicalName());
}
// Define Lite job root configuration
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(jobConfig).overwrite(true).build(); // This overrides the previous configuration, and the current configuration takes effect
returnsimpleJobRootConfig; }}Copy the code
Second, what is the difference between data flow scheduled task and simple scheduled task
This is the data stream, the interface that needs to be inherited, so what’s the difference, the difference is that the timed task here, when you shard it’s done on fetchData, this method, it can return a List so processData can handle the data returned by the timed timed task, the timed task of the data stream The application scenarios I can think of are as follows: periodic tasks are executed in fragments,0= Redis for cache data processing,1=mysql periodic tasks fall disk, 2= report statistics, and the three tasks are executed in three different services.
// Data flow distributed job interface.
public interface DataflowJob<T> extends ElasticJob {
/** * Get the data to be processed@paramShardingContext shardingContext *@returnSet of data to be processed */
List<T> fetchData(ShardingContext shardingContext);
/** * process data. *@paramShardingContext shardingContext *@paramData Sets of data to be processed */
void processData(ShardingContext shardingContext, List<T> data);
}
Copy the code
Understanding this sharding means that it’s done once in five seconds, and the segmentation is distributed through the server of the shard to different shard parameters, different servers to process different shard parameters so there are four of them in this screenshot, the first one will only accept image and the second one will only accept radio And so on and so forth, and the nice thing about this is that you don’t have to worry about repeated execution in distributed deployment
@Component
public class FileCustomElasticJob implements SimpleJob {
@Autowired
private FileCustomMapper fileCustomMapper;
@Override
public void execute(ShardingContext shardingContext) {
doWork(shardingContext.getShardingParameter());
}
private void doWork(String fileType){
List<FileCustom> fileList = fileCustomMapper.selecByType(fileType);
System.out.println("Type :"+fileType+", file, number of backups needed :"+fileList.size());
for(FileCustom fileCustom:fileList){ backUpFile(fileCustom); }}private void backUpFile(FileCustom fileCustom){
try {
// Simulate the backup action
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Execute file backup ====>"+fileCustom);
fileCustomMapper.changeState(fileCustom.getId(),1); }}Copy the code