Description :ElasticJob

ElaticJob is a framework for scheduling tasks. It provides a distributed scheduling task framework with the functions of job fragmentation strategy, scheduling task catch-up, job running status monitoring, job listener, self-diagnosis and repair, etc. Note: this section is an introduction based on SPringElaticJob

Preface:

How can we realize distributed timing task If let us write a distributed task regularly, how should we use, also is very simple, actually we know of Java memory or memory directly, is open in the current task or process of a piece of memory space, then the other process is unable to access the memory space. To implement distributed timed tasks, we need to solve this problem first. Zookeeper: This is a good choice

Solution a: Let’s learn about what is a Zookeeper, this is a storage node framework, and can undertake node listens, perceptual node from top to bottom line, after connected to the Zk, we can to node in the output of the Zk, node monitoring, child nodes data monitoring, node data monitoring, child nodes data monitoring, such as operation, through the brief introduction, then, shall we To understand that ZK can meet the node online monitoring and election operations;

The dynamic monitoring of the nodes can be addressed by the ZK, then we have to analyse, if timed tasks, you will need to have a cron expression, and ElasticJob shard operation, so he is how to solve

Solution 2: How to implement Sharding, when we define a scheduled task configuration, we can add description Sharding to the parameter Sharding, that is to say, it will be executed in several pieces. It has three rules: fair, hash IP, custom sharding strategy, implementation of JobShardingStrategy interface and implementation of Sharding method, interface method parameters are job server IP list and sharding policy options, sharding policy options include job name, The total number of fragments, sequence number of fragments, and the mapping table of personalized parameters can be customized as required.

How to write configuration:

server:
  port: 8088
elasticjob: This is our own non-framework configuration of the area at initialization time
  zookeeper-url: localhost:2181   #zookeeper- Url link port
  group-name: elastic-job-group  # This is the name of zooker's visual interface creation project
spring:
  datasource: # data source configuration
    url: jdbc:mysql://localhost:3306/elastic-job-demo? serverTimezone=GMT%2B8
    driverClassName: com.mysql.cj.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
    username: root
    password: admin
Copy the code

A: how to use a simple scheduled task

Step 1 :Maven dependencies (Spring)

< the dependency > < groupId > com. Dangdang < / groupId > < artifactId > elastic - job - lite - spring < / artifactId > < version > 2.1.5 < / version > </dependency>Copy the code

Step 2: Define a Zk connector

The point of this step is to connect to zK, a registry that Elasticjob knows about and can use freely, which is a demonstration of the core technology for implementing sharding

/ / simple
@Configuration
public class CoordinatorRegistryConfig {

    @Bean(initMethod = "init")
    private static CoordinatorRegistryCenter createRegisterCenter(@Value("${elasticjob.zookeeper-url}") String zookeeperUrl, @Value("${elasticjob.group-name}") String group){
        // Set the zK address and task group name
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(zookeeperUrl, group);
        zookeeperConfiguration.setSessionTimeoutMilliseconds(100);
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        returnregCenter; }}Copy the code

Step 3: Define a handler simple

ShardingContext ShardingContext ShardingContext ShardingContext ShardingContext ShardingContext ShardingContext ShardingContext

Sharding: If write 3 follows the default method, then according to the most basic sharding principle, suppose you have three deployments, then each task will be evenly distributed among three servers: This parameter corresponds to Sharding if Sharding =3, shardingParam will need to configure the three sharding processing information, such as 0=” China “,1=” city “,2=” people “, and only one of these parameters will be retrieved when the three parameters are assigned to different execution programs.

JobName: jobName FileCustomElasticJob is actually the name of the class that executes the job

TaskId: job ID. FileCustomElasticJob@-@0,1,2,3@-@READY@[email protected]@-@20348 specifies the IP address and how many pieces to execute

ShardingTotalCount: indicates the total number of fragments. 4. 0123 JobParam: Obviously, a normal parameter can be obtained at specified time

@Component
public class MySimpleJob implements SimpleJob {
    // The task scheduling method is executed at regular intervals
    @Override
    public void execute(ShardingContext shardingContext) {
        // Scheduled task logic
        System.out.println("Execution time :"+newDate()); }}/ / data flow
@Component
public class FileCustomElasticJob implements SimpleJob {
    @Autowired
    private FileCustomMapper fileCustomMapper;
    @Override
    public void execute(ShardingContext shardingContext) {
        doWork(shardingContext.getShardingParameter());
    }
    private void doWork(String fileType){
        List<FileCustom> fileList = fileCustomMapper.selecByType(fileType);
        System.out.println("Type :"+fileType+", file, number of backups needed :"+fileList.size());
        for(FileCustom fileCustom:fileList){ backUpFile(fileCustom); }}private void backUpFile(FileCustom fileCustom){
        try {
            // Simulate the backup action
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Execute file backup ====>"+fileCustom);
        fileCustomMapper.changeState(fileCustom.getId(),1); }}Copy the code

Step 4: Define how this simple program should be executed

Explanation: CreateJobConfiguration: Create a configuration center for a Job class-bytecode how often cron is executed shardingTotalCount How many slices are partitioned DataFlowType is a scheduled task for the data stream. If you need to set more things, you can add a field. For example, I wrote JobParam 123

IniDataflowElasticJob: This class initializes the execution policy of the fileDataFlowJob scheduled task. Which registry to connect to, how to do it, how to shard, what are the shard parameters, whether the data flow is specified, etc., and then put back into the Spring container for management

/** * Created by wolfcode-lanxw */
@Configuration
public class ElasticJobConfig {

    @Autowired
    private CoordinatorRegistryCenter registryCenter;
    @Bean(initMethod = "init")
    public SpringJobScheduler iniDataflowElasticJob(FileDataflowJob fileDataflowJob){
        SpringJobScheduler springJobScheduler = new SpringJobScheduler(
                fileDataflowJob,/ / business class
                registryCenter,/ / they are classes
                createJobConfiguration/ / configuration class
                        (FileDataflowJob.class,"0/5 * * * *?".2."0=text,1=image,2=radio,3=vedio".true));
        return springJobScheduler;
    }
    
    private static LiteJobConfiguration createJobConfiguration(final Class<? extends ElasticJob> jobClass,
                                                               final String cron,
                                                               final int shardingTotalCount,
                                                               final String shardingItemParameters,
                                                               boolean dataflowType) {
        // Define the job core configuration
        JobCoreConfiguration.Builder jobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getSimpleName(), cron, shardingTotalCount);
        if(! StringUtils.isEmpty(shardingItemParameters)){ jobCoreConfigurationBuilder.jobParameter("123").shardingItemParameters(shardingItemParameters);  / / conditions
        }
        JobTypeConfiguration jobConfig = null;
        if(dataflowType){
            jobConfig = new DataflowJobConfiguration(jobCoreConfigurationBuilder.build(),jobClass.getCanonicalName(),true);
        }else{
            // Define the SIMPLE configuration
            jobConfig = new SimpleJobConfiguration(jobCoreConfigurationBuilder.build(), jobClass.getCanonicalName());
        }
        // Define Lite job root configuration
        LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(jobConfig).overwrite(true).build();  // This overrides the previous configuration, and the current configuration takes effect
        returnsimpleJobRootConfig; }}Copy the code

Second, what is the difference between data flow scheduled task and simple scheduled task

This is the data stream, the interface that needs to be inherited, so what’s the difference, the difference is that the timed task here, when you shard it’s done on fetchData, this method, it can return a List so processData can handle the data returned by the timed timed task, the timed task of the data stream The application scenarios I can think of are as follows: periodic tasks are executed in fragments,0= Redis for cache data processing,1=mysql periodic tasks fall disk, 2= report statistics, and the three tasks are executed in three different services.


// Data flow distributed job interface.
public interface DataflowJob<T> extends ElasticJob {
    
    /** * Get the data to be processed@paramShardingContext shardingContext *@returnSet of data to be processed */
    List<T> fetchData(ShardingContext shardingContext);
    /** * process data. *@paramShardingContext shardingContext *@paramData Sets of data to be processed */
    void processData(ShardingContext shardingContext, List<T> data);
}
Copy the code

Understanding this sharding means that it’s done once in five seconds, and the segmentation is distributed through the server of the shard to different shard parameters, different servers to process different shard parameters so there are four of them in this screenshot, the first one will only accept image and the second one will only accept radio And so on and so forth, and the nice thing about this is that you don’t have to worry about repeated execution in distributed deployment

@Component
public class FileCustomElasticJob implements SimpleJob {
    @Autowired
    private FileCustomMapper fileCustomMapper;
    @Override
    public void execute(ShardingContext shardingContext) {
        doWork(shardingContext.getShardingParameter());
    }
    private void doWork(String fileType){
        List<FileCustom> fileList = fileCustomMapper.selecByType(fileType);
        System.out.println("Type :"+fileType+", file, number of backups needed :"+fileList.size());
        for(FileCustom fileCustom:fileList){ backUpFile(fileCustom); }}private void backUpFile(FileCustom fileCustom){
        try {
            // Simulate the backup action
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Execute file backup ====>"+fileCustom);
        fileCustomMapper.changeState(fileCustom.getId(),1); }}Copy the code