This article will document the case study of ElasticJob, the open source framework of Dangdang

Task scheduling framework Quartz

In ElasticJob, the bottom layer is packed with Quartz, so let’s take a look at Quartz

Introduction to CRon expressions

Create job task time triggers (similar to bus departure schedules)

The CRon expression consists of seven positions separated by Spaces

  • 1, Seconds (Seconds) 0~59
  • 2. How many Minutes (分) do you have
  • 3, The number of Hours is 0~23
  • 1~31 days. Note that some months have less than 31 days
  • 5, Month (Month) 0~11, or

JAN,FEB,MAR,APR,MAY,JUN,JUL,AUG,SEP,OCT,NOV,DEC

  • 6, Day of Week (Week) 1 ~ 7, 1 = SUN or SUN, MON, TUE, WEB, THU, FRI, SAT
  • 7, Year (Year) 1970~2099 optional

Example:

  • 0, 0, 11 * *? The trigger is executed at 11 o ‘clock every day
  • 0, 30, 10, 1 times? The trigger is executed at 10:30 a.m. on the first day of each month

Configure poM files

<dependencies>
    <dependency>
        <groupId>org.quartz-scheduler</groupId>
        <artifactId>quartz</artifactId>
        <version>2.3.2</version>
    </dependency>
</dependencies>
Copy the code

The main body of code

public class QuartzMain {
    // Create a scheduler
    public static Scheduler createScheduler(a) throws SchedulerException {
        SchedulerFactory schedulerFactory = new StdSchedulerFactory();
        Scheduler scheduler = schedulerFactory.getScheduler();
        return scheduler;
    }

	// Create a task detail
    public static JobDetail createJob(a){
        JobBuilder jobBuilder = JobBuilder.newJob(DemoJob.class);
        jobBuilder.withIdentity("jobName"."myJob");
        JobDetail jobDetail = jobBuilder.build();
        return jobDetail;
    }

	// Create a trigger
    public static Trigger createTrigger(a){
        CronTrigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("triggerName"."myTrigger")
                .startNow()
                .withSchedule(CronScheduleBuilder.cronSchedule("*/2 * * * *?"))
                .build();

        return trigger;
    }

	// Execute body content
    public static void main(String[] args) throws SchedulerException { Scheduler scheduler = QuartzMain.createScheduler(); JobDetail job = QuartzMain.createJob(); Trigger trigger = QuartzMain.createTrigger(); scheduler.scheduleJob(job,trigger); scheduler.start(); }}Copy the code

The demo looks like this. No more than two seconds to print out the task

I'm a timed task I'm a timed task I'm a timed taskCopy the code

Distributed scheduling framework Elastice-Job

Introduction to the

Elastic-job is an open-source distributed scheduling solution developed by Dangdang based on Quartz. It consists of two independent subprojects, elastic-Job-Lite and Elastic-Job-Cloud

  • Elastice-job-lite lightweight decentralized solution that uses Jar packages to provide distributed task coordination services
  • Elastice-job-cloud needs to be used in conjunction with Mesos and Docker in a Cloud environment.

The main function

  • [Distributed Scheduling coordination] In a distributed environment, tasks can be executed according to specified scheduling policies, and multiple instances of the same task can be avoided

  • Rich scheduling Policies Perform scheduled tasks based on the mature Quartz cron expression

  • [Elastic scaling] When an instance is added to a cluster, it should be able to be elected and perform tasks. When the cluster is reduced by an instance

, the tasks it performs can be transferred to another instance.

  • [Failover] If a task fails to be executed on an instance, the task will be transferred to another instance

  • [Retriggering of missed job execution] If the job is missed due to some reason, the missed job will be automatically recorded and the last job will be executed

Automatic trigger after completion.

  • [Support for parallel scheduling] Support for task fragmentation, which means that a task is divided into multiple small tasks and executed simultaneously in multiple instances.

  • [Job fragmentation consistency] When a task is fragmented, only one execution instance of the same fragment is guaranteed in the distributed environment.

Start using Elastic-Job

Elastic-Job relies on Zookeeper for distributed coordination, so you need to install the Zookeeper software

Install the Zookeeper

  • To download an image file from the official documents mirrors.tuna.tsinghua.edu.cn/apache/zook…

  • Decompress the package tar -zxvf zookeeper-3.4.14.tar.gz

  • Go to the conf directory cp zoo_sample.cfg zoo.cfg

  • Go to the bin directory and start the ZK server

    Sh stop -- Stops the server./zkServer.sh status -- stops the serverCopy the code

    Start the service

    [root@localhost bin]# ./zkServer.sh start
    ZooKeeper JMX enabled by default
    Using config: /root/zookeeper-3.4.14/bin/.. /conf/zoo.cfg Starting zookeeper ... STARTEDCopy the code

Check the status

[root@localhost bin]#./ zkserver. sh status ZooKeeper JMX enabled by default Using config: /root/zookeeper-3.4.14/bin/.. /conf/zoo.cfg Mode: standaloneCopy the code

Zookeeper visualization tool

  • Download address

Issues.apache.org/jira/secure…

  • Decompress and go to the build directory

  • Start the visualization tool with java-JAR
java -jar .\zookeeper-dev-ZooInspector.jar
Copy the code
  • Enter the IP address and port number. The default port number is 2181

Database building table sentences

The idea here is to do a synchronization service where users from resume are archived into resume_bak

-- ----------------------------
-- Table structure for resume
-- ----------------------------
DROP TABLE IF EXISTS `resume`;
CREATE TABLE `resume` (
 `id` bigint(20) NOT NULL AUTO_INCREMENT,
 `name` varchar(255) DEFAULT NULL.`sex` varchar(255) DEFAULT NULL.`phone` varchar(255) DEFAULT NULL.`address` varchar(255) DEFAULT NULL.`education` varchar(255) DEFAULT NULL.`state` varchar(255) DEFAULT NULL,
 PRIMARY KEY (`id`))ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
SET FOREIGN_KEY_CHECKS = 1;


-- ----------------------------
-- Table structure for resume_bak
-- ----------------------------
DROP TABLE IF EXISTS `resume_bak`;
CREATE TABLE `resume_bak` (
 `id` bigint(20) NOT NULL AUTO_INCREMENT,
 `name` varchar(255) DEFAULT NULL.`sex` varchar(255) DEFAULT NULL.`phone` varchar(255) DEFAULT NULL.`address` varchar(255) DEFAULT NULL.`education` varchar(255) DEFAULT NULL.`state` varchar(255) DEFAULT NULL,
 PRIMARY KEY (`id`))ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
SET FOREIGN_KEY_CHECKS = 1;
Copy the code

The main body of code

The main functions of the current body code are: create configuration, coordinate configuration center, schedule, shard, etc. Here we only configure a shard

public class ElasticJobMain {
    public static void main(String[] args) {

        Create a Zookeeper configuration file
        // Set the IP address. The default port number is
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("192.168.56.101"."myjob-name");

        //2. Configure the distributed coordination registry
        CoordinatorRegistryCenter coordinatorRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        coordinatorRegistryCenter.init();

        //3. Specify the scheduling frequency and period of jobs
        // The difference here is that elastice-job has sharding
        // Sharding is a very common concept, for example redis has sharding, elasticSearch also has sharding
        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
                .newBuilder("archive-job"."*/1 * * * *?".1)
                .build();

        //4. This encapsulates the frequency attribute and the job
        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, ArchivieJob.class.getName());

        //5. Encapsulate the simple configuration as LiteJob configuration again
        LiteJobConfiguration jobConfiguration = LiteJobConfiguration
                .newBuilder(simpleJobConfiguration)
                .overwrite(true)
                .build();

        /** * It can be seen that the same Configuration is not designed to put all the information together * but divided into different configurations, including the Configuration of processing frequency and the Configuration of job assignment. There are lightweight configurations * and then they are encapsulated layer by layer in the form of composition, with each class remaining independent */

        // Create task scheduler,
        // A collaborative registry is required
        // Lightweight job configuration is required
        JobScheduler jobScheduler = newJobScheduler(coordinatorRegistryCenter, jobConfiguration); jobScheduler.init(); }}Copy the code

Configuring Specific Tasks

/** * Public class ArchivieJob implements SimpleJob {public void execute(ShardingContext) shardingContext) { int shardingItem = shardingContext.getShardingItem(); String jobParameter = shardingContext.getJobParameter(); String name = ManagementFactory.getRuntimeMXBean().getName(); String machineName = name.split("@")[0]; String pid = name.split("@")[1]; Println (" shardingItem:"+shardingItem :"+jobParameter :"+jobParameter); }}Copy the code

IDEA supports multiple running instances of Main at the same time

IDEA does not support multiple main methods at the same time by default

View node values in ZooKeeper

You can see that there are several nodes that need attention in the visualization tool

  1. Instances This node indicates the current only one machine is connected

You can see that when the console prints, it also indicates that the front number is 27204

  1. If you run a console program again, a machine will be added to instances. At this point, we focus on Sharding, and we can find that the current task only has one shard -0, and this instance is 27204.

  1. At this point we try to add a shard, and then we look at the shard situation, set it to 3, and only run one instance
 JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
                .newBuilder("archive-job"."*/1 * * * *?".3)
                .build();
Copy the code

As you can see, the shard is increased by three, and each instance in the shard is 9424

Leader Node election mechanism

The essence of adding sharding, launching multiple instances, and observing client tools is their election mechanism, which works as follows:

  1. Each Elastic- Job acts as a Zookeeper client and operates Zookeeper’s ZNodes
  2. Multiple instances simultaneously create the /leader node
  3. Only one leader node can be created. The later one fails to be created. The successfully created instance is selected as the Leader node to execute tasks

Lightweight decentralization

decentralized

  • Performing node peer, each set of programs is the same
  • The scheduled scheduling is automatically triggered and does not require the dispatching center
  • Service self-discovery (through service discovery in the registry)
  • The primary node is not fixed

lightweight

  • All files are packaged in a Jar file
  • Only the ZooKeeper service is required

The architecture diagram

This interface contains the main functionality

** @author zhangliang */ public interface JobFacade {/** * read job configuration. ** @param fromCache whether to read * fromCache @return Job configuration */ JobRootConfiguration loadJobRootConfiguration(Boolean fromCache); / check job execution environment. * * * * * @ throws JobExecutionEnvironmentException job execution environment abnormal * / void checkJobExecutionEnvironment () throws JobExecutionEnvironmentException; */ void failoverIfNecessary(); */ void failoverIfNecessary(); ** @param shardingContexts */ void registerJobBegin(shardingContexts shardingContexts); /** @param shardingContexts shardingContext */ void registerJobCompleted(shardingContexts shardingContexts); ** @return Sharding context */ ShardingContexts getShardingContexts(); ** @param shardingItems Specifies the missed task fragment * @return whether the misfire condition is met */ Boolean misfireIfRunning(Collection<Integer> shardingItems); ** @param shardingItems To clear missed task fragments */ void clearMisfire(Collection<Integer> shardingItems); ** @param shardingItems Collection of task fragments * @return Whether the job needs to execute the missed task */ Boolean isExecuteMisfired(Collection<Integer> shardingItems); </p> ** @return Specifies whether the job is eligible to continue running */ Boolean isEligibleForJobRunning(); ** @return Whether resharding is required */ Boolean isNeedSharding(); ** @param shardingContexts sharding context */ void beforeJobExecuted(shardingContexts shardingContexts); ** @param shardingContexts */ void afterJobExecuted(shardingContexts shardingContexts); ** @param jobExecutionEvent jobExecutionEvent */ void postJobExecutionEvent(jobExecutionEvent jobExecutionEvent); ** @param taskId job Id * @param state Job execution status * @param message job execution message */ void postJobStatusTraceEvent(String taskId, JobStatusTraceEvent.State state, String message); }Copy the code

The Elastic job supports flexible capacity expansion, centralized job management and monitoring through Zookepper, and failover

Task fragmentation

A large and time-consuming Job, for example, requires processing 100 million data at one time, and the 100 million data is stored in the database. If it takes a long time to process 100 million data with one Job node, it is not acceptable in the Internet field, where more machines are expected to horizontally expand the processing capacity. So, the ElasticJob can be divided into multiple tasks (each task is a task fragment), and each task is assigned to a specific machine instance (a machine instance can handle multiple tasks), but it is up to us to specify what logic each task performs.

  • Configuring sharding

  • Get shard information

Shard strategy

In the system, there is a JobShardingStrategy class: JobShardingStrategy, and there are three subclasses

public interface JobShardingStrategy {
    /** * Job fragment. *@paramJobInstances List of all units participating in sharding *@paramJobName jobName *@paramShardingTotalCount Total number of fragments *@returnSharding result */
    Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount);
}
Copy the code
  • AverageAllocationJobShardingStrategy shard strategy based on the average allocation algorithm.
* If shards are not divisible, extra shards that are not divisible are appended to servers with lower serial numbers. * * 1. If there are three servers, is divided into 9 piece, the server assigned to each shard is: 1 = [0], 2 = (three, four, five), 3 = [June]. * 2. If you have 3 servers, divided into eight slices, the server assigned to each shard is: 1 =,1,6 [0], 2 =,3,7 [2], 3 = [4, 5]. * 3. If you have 3 servers, divided into 10 piece, the server assigned to each shard is: 1 =,1,2,9 [0], 2 = (three, four, five), 3 = (June).Copy the code
  • RotateServerByNameJobShardingStrategy according to job name hash value for rotary shard strategy server list.

  • OdevitySortByNameJobShardingStrategy determined according to the job name hash value was even IP lifting sequence subdivision strategy of the algorithm.
* If the hash value of the job name is odd, the IP is in ascending order. * If the hash value of the job name is even, the IP is in descending order. * For different jobs to evenly distribute the load to different servers. If there are three servers divided into two pieces and the hash value of the job name is odd, the shard assigned to each server is: 1=[0], 2=[1], 3=[]. * 2. If there are 3 servers divided into 2 pieces and the hash value of the job name is even, the shard assigned to each server is: 3=[0], 2=[1], 1=[].Copy the code

Primary node election

The elastic expansion

Consider this scenario: add a new running instance app3. How does it work?

  1. Instances are automatically registered with the registry
  2. The registry finds a new service coming online
  3. The registry notifies ElasticJob to refragment

So the total number of shards, then how many instance machines can be, for example, can be divided into 1000 pieces, then you can have 1000 machines together to perform the job

Note:

  1. The sharding item is also a JOB configuration. If the configuration is modified, the sharding algorithm will be called again before the next scheduled operation

The result of this sharding algorithm is that which machine runs which slice, the result is stored in the ZK, the master node divides the slice and places it in the registry, and the executor node retrieves the information from the registry (executor node retrieves the corresponding slice when the scheduled task is started).

  1. If all nodes fail and only one node is left, all fragments point to the remaining node, which is also the high value of ElasticJob

To use.

Reference:

  • www.cnblogs.com/yangfei-bei…
  • www.imooc.com/article/det…
  • www.cnblogs.com/davidwang45…
  • Blog.csdn.net/qq924862077…