1. Quartz

Quartz is a powerful open source task scheduling framework provided by OpenSymphony for performing scheduled tasks. For example, when we need to export data from a database at 3am every day, we need a task scheduling framework to automate the execution of these procedures. So how does Quartz work?

1) First we need to define an interface to run the business logic, Job, which our class inherits to implement the business logic, such as reading the database and exporting data at 3am.

2) When we have a Job, we need to execute the Job on time, which requires a Trigger Trigger, which is to execute the Job defined by us at three o ‘clock in the morning every day according to our requirements.

3) Once you have a task Job and a Trigger Trigger, you need to combine them and make the Trigger Trigger call the Job at a specified time. Then you need a Schedule to implement this function. So, Quartz has three main components:

Scheduler: Scheduler Task: JobDetail Trigger: Trigger, including SimpleTrigger and CronTrigger

The process for creating a Quartz task is as follows:

Public class HelloJob implements Job {...... JobDetail JobDetail = jobBuilder.newJob (HelloJob. Class) // Define a trigger, In accordance with the prescribed time scheduling homework Trigger the Trigger. = TriggerBuilder newTrigger (" once every 1 minutes ") / / create the Scheduler according to the working class and triggers the Scheduler Scheduler = scheduler.scheduleJob(jobDetail,trigger); Scheduler.start ()Copy the code

2. Basic principles of elastic-job

2.1 shard

To improve the concurrent capability of tasks, the concept of sharding is introduced. That is, a task is divided into multiple sharding pieces, which are then collected and executed by multiple machines. For example, a database has 100 million pieces of data that need to be read and calculated before being written to the database. You can divide those 100 million pieces of data into 10 shards, and each shard reads 10 million pieces of data, calculates them, and writes them to the database. The 10 shards are numbered 0,1,2… 9. If there are three machines executing, machine A is divided into shards (0,1,2,9), machine B into shards (3,4,5), and machine C into shards (6,7,8).

2.2 Job scheduling and execution

Elastice-job is a decentralized task scheduling framework. When multiple nodes are running, a primary node is selected first. When the execution time reaches, each instance starts to execute tasks. Then, each node obtains the partitioned fragments from ZooKeeper and sends the fragment information as a parameter to the local task function to execute the task.

2.3 Types of work

Elastic -job supports three types of job processing!

Simple jobs: Simple is used for common tasks that require only the SimpleJob interface. This interface provides only a single method for override, which will execute periodically, similar to the Quartz native interface.

Dataflow jobs: Dataflow jobs are used to process data streams and need to implement the DataflowJob interface. The interface provides two methods to override, one for fetching (fetchData) and the other for processing (processData) data.

Script jobs: Script jobs refer to Script jobs, which support all types of scripts such as shell, Python, and Perl. Simply configure scriptCommandLine via console or code, no coding required. The execution script path can contain parameters. After the parameters are passed, the job framework automatically appends the last parameter to the job runtime information.

3. The execution principle of Elastic-Job

3.1 Elastice-Job Startup Process

The following uses a task of the SimpleJob type to illustrate the elastic-Job startup process

Public class MyElasticJob implements SimpleJob {public void execute(ShardingContext context) {// Implement business logic...... } // Set zooKeeper, As a distributed task registry private static CoordinatorRegistryCenter createRegistryCenter () {CoordinatorRegistryCenter regCenter = new  ZookeeperRegistryCenter(new ZookeeperConfiguration("xxxx")); regCenter.init(); return regCenter; } private static LiteJobConfiguration createJobConfiguration() {JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?" , 10).build(); SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, SimpleJobConfiguration) MyElasticJob.class.getCanonicalName()); / / define Lite homework root configuration LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration. NewBuilder (simpleJobConfig). The build (); return simpleJobRootConfig; } public static void main(String[] args) {new JobScheduler(createRegistryCenter(), createJobConfiguration()).init(); }}Copy the code

Create a elastice-job Job and execute it as follows:

1) Set basic zooKeeper information. Elastic-Job uses ZooKeeper for distributed management, such as master selection, metadata storage and reading, and distributed listening.

2) Create a Job class to execute tasks. Take Simple jobs as an example. Create a class that inherits SimpleJob and implement execute function in this class.

3) set the basic job information. In JobCoreConfiguration, set the jobName (jobName), the expression of the job execution time (cron), and the shardingTotalCount. Then set up the Job class to execute the Job in SimpleJobConfiguration, and finally define the Lite Job root configuration.

4) Create an instance of JobScheduler and initialize the job in JobScheduler’s init() method so that the job starts running.

Job scheduling is performed in JobScheduler. The JobScheduler method is described in the following section. JobScheduler is defined as follows:

public class JobScheduler { public static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob"; private static final String JOB_FACADE_DATA_MAP_KEY = "jobFacade"; // Job configuration private final LiteJobConfiguration liteJobConfig; / / registry private final CoordinatorRegistryCenter regCenter; Private final SchedulerFacade SchedulerFacade; // private final JobFacade JobFacade; private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) { JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance()); this.liteJobConfig = liteJobConfig; this.regCenter = regCenter; List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners); setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList); schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList); jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus); }Copy the code

In the JobScheduler constructor above, set the job configuration information liteJobConfig, registry regCenter, a series of listeners elasticJobListenerList, scheduler facade, and job facade.

After the JobScheduler instance is created, the job is initialized as follows:

/ initialization work. * * * * / public void init () {JobRegistry. GetInstance (). SetCurrentShardingTotalCount (liteJobConfig. GetJobName (), liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount()); JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfig.getTypeConfig().getJobClass()), liteJobConfig.getJobName()); JobRegistry.getInstance().registerJob(liteJobConfig.getJobName(), jobScheduleController, regCenter); schedulerFacade.registerStartUpInfo(liteJobConfig); jobScheduleController.scheduleJob(liteJobConfig.getTypeConfig().getCoreConfig().getCron()); }Copy the code

As above,

JobRegistry is a JobRegistry that stores the metadata of a job as a singleton. JobRegistry sets the total number of shards and other information.

2) The jobScheduleController is a job scheduling controller. The jobScheduleController can schedule, reschedule, suspend, resume, and resume jobs immediately. So jobs are started, paused, and resumed in jobScheduleController.

3) set the job name, job scheduler, and registry in the JobRegistry JobRegistry.

4) execute the registerStartUpInfo method on the schedulerFacade to register the job start information as follows:

** @param liteJobConfig Job configuration */ public void registerStartUpInfo(final LiteJobConfiguration) liteJobConfig) { regCenter.addCacheData("/" + liteJobConfig.getJobName()); / / open all listeners listenerManager. StartAllListeners (); The master node / / election leaderService. ElectLeader (); // Persist the job configuration configService.persist(liteJobConfig); LiteJobConfiguration liteJobConfigFromZk = configService.load(false); / / persistent job servers online information serverService persistOnline (! liteJobConfigFromZk.isDisabled()); / / persistence operations running instance on-line information, the service instance registered to zk instanceService. PersistOnline (); / / set needs to be shard marked shardingService setReshardingFlag (); // Initialize job listening service monitorService.listen(); // Initialize the mediation job inconsistent status service if (! reconcileService.isRunning()) { reconcileService.startAsync(); }}Copy the code

As above,

1) Start all listeners and use the Watch mechanism of ZooKeeper to monitor the changes of various metadata in the system, so as to perform corresponding operations

2) To elect the master node, the distributed lock of ZooKeeper is used to select a master node, and the master node is mainly divided into fragments.

3) Persist various metadata to ZooKeeper, such as the configuration information of the job and the information of each service instance

4) Set the mark for sharding, which requires re-sharding in the first execution of a task or the increase or decrease of service instances in the system.

After the job startup information is registered, the scheduleJob method of the jobScheduleController is called to schedule the job and the job starts to execute. The scheduleJob method is as follows:

Public void scheduleJob(final String cron) {try {if (! scheduler.checkExists(jobDetail.getKey())) { scheduler.scheduleJob(jobDetail, createTrigger(cron)); } scheduler.start(); } catch (final SchedulerException ex) { throw new JobSystemException(ex); }}Copy the code

The scheduler calls scheduler.start() by combining the jobDetail with the Trigger.

From the above code analysis. The process for starting a job is as follows:

3.2 Elastice-Job Execution Process

According to the previous explanation of Quartz, task execution is actually running the business logic defined in JobDetail. We only need to look at the content in JobDetail to know the process of job execution

private JobDetail createJobDetail(final String jobClass) JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build(); // Ignore other code}Copy the code

As you can see from the above code, the task performed is the content of the LiteJob class

public final class LiteJob implements Job { @Setter private ElasticJob elasticJob; @Setter private JobFacade jobFacade; @Override public void execute(final JobExecutionContext context) throws JobExecutionException { JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute(); }}Copy the code

LiteJob by JobExecutorFactory get into the job executor (AbstractElasticJobExecutor), and to perform:

Public final class JobExecutorFactory {/** ** gets the job executor. ** @param elasticJob distributed elasticJob * @param jobFacade job internal service facade * @ return the job executor * / @ SuppressWarnings (" unchecked ") public static AbstractElasticJobExecutor getJobExecutor (final ElasticJob  elasticJob, final JobFacade jobFacade) { // ScriptJob if (null == elasticJob) { return new ScriptJobExecutor(jobFacade); } // SimpleJob if (elasticJob instanceof SimpleJob) { return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade); } // DataflowJob if (elasticJob instanceof DataflowJob) { return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade); } throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName()); }}Copy the code

JobExecutorFactory returns the corresponding job executor according to the job type, and then executes the corresponding job executor’s execute() function. So let’s look at the execute function

/ / AbstractElasticJobExecutor. Java public final void the execute () {/ / check job execution environment try { jobFacade.checkJobExecutionEnvironment(); } catch (final JobExecutionEnvironmentException cause) { jobExceptionHandler.handleException(jobName, cause); } / / get the current job servers shard context ShardingContexts ShardingContexts = jobFacade. GetShardingContexts (); / / release job status tracking events (State. TASK_STAGING) if (shardingContexts. IsAllowSendJobEvent ()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName)); } / / skip is running be missed homework if (jobFacade. MisfireIfRunning (shardingContexts. GetShardingItemParameters (). The keySet ())) {/ / Release job status tracking events (State. TASK_FINISHED) if (shardingContexts. IsAllowSendJobEvent ()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format( "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, shardingContexts.getShardingItemParameters().keySet())); } return; } / / execution work before the execution of the methods try {jobFacade. BeforeJobExecuted (shardingContexts); //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobExceptionHandler.handleException(jobName, cause); } / / perform a normal triggered operations execute (shardingContexts, JobExecutionEvent. ExecutionSource. NORMAL_TRIGGER); / / execution triggered skipped homework while (jobFacade. IsExecuteMisfired (shardingContexts. GetShardingItemParameters (). The keySet ())) { jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet()); execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE); } / / perform operation failure transfer jobFacade failoverIfNecessary (); / / perform job execution method after the try {jobFacade. AfterJobExecuted (shardingContexts); //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobExceptionHandler.handleException(jobName, cause); }}Copy the code

The main flow of execute function:

  1. Check the job execution environment

  2. Gets the sharding context of the current job server. By function jobFacade. GetShardingContexts () to obtain information on current fragmentation, by the master node according to the corresponding shard strategy to classify the item shard, divide after good will result in to the zookeeper, other nodes to get results from the zookeeper.

  3. Publish job status tracking events

  4. Skip running jobs that have been missed

  5. The method before the execution of the job

  6. At the end of a normally triggered job, the execute method in MyElasticJob is called to execute the user’s business logic. The entire Elastic-Job execution process is as follows:

4. Optimization practices of Elastice-job

4.1 Idling problem

Depending on whether there is an implementation class or not, a Job with an implementation class can be divided into two types: jobs with and without an implementation class. Jobs of Simple and DataFlow types, for example, require user-defined implementation classes that inherit SimpleJob or DataFlowJob classes. The other is jobs that do not need to implement classes, such as Script type jobs and Http type jobs. For such jobs that do not need to implement classes, users only need to fill in the corresponding configuration on the configuration platform, and then we periodically pull the latest registered tasks from the configuration platform in the background. The user’s newly registered script or Http type job can then be executed.

Job execution in a production environment, a cluster of machines quantity many, but the user registration each job shard rarely (most) only one divided, according to the previous analysis, corresponding to only one shard of tasks, the all machines in the cluster will participate in the running, but the only get the subdivision of the machine can really run, The rest is idle because there is no sharding, which is a waste of computing resources.

4.2 Solutions

In order to solve the idle problem caused by a small number of fragments and a large number of execution servers, our solution is that users specify corresponding execution servers when configuring platform registration tasks, and the number of execution servers M= number of fragments +1 (the extra machines are used as redundant backup). For example, if the user’s job fragment is 2, the background selects three machines with the lightest load as the execution server based on the current load of the machines every day. In this way, when these machines periodically pull tasks from the configuration platform, if they find that they do not belong to the execution server of the task, they do not run the job. Only the execution server belonging to the current task runs the job. This not only ensures the reliability, but also avoids the idle running of too many machines and improves the efficiency.

OPPO mass job scheduling scheme

Elastice-job uses ZooKeeper to implement Elastic and distributed functions, which can meet user requirements when the number of tasks is small. However, it also has the following disadvantages:

  1. The Elastic distribution function of Elastic-Job relies heavily on ZooKeeper, which is prone to performance bottlenecks.

  2. The number of task fragments may be smaller than the number of task instances, resulting in some machines idling.

Based on the above shortcomings of Elastice-Job, the OPPO middleware team adopts a centralized scheduling scheme to deal with massive task scheduling. Users’ jobs do not need to be triggered periodically through Quartz, but local tasks are triggered by receiving messages from the server. Users registered in registry platform first task, the server timing from the database of registered platform scan a recent period (within 30 seconds) need to perform tasks, then according to the actual execution time of task delay messages and write with time delay function of the message queue, user to pull data from the message queue and trigger job execution. In this centralized scheduling mode, the central server triggers message execution, which not only overcomes the performance bottleneck of ZooKeeper, but also avoids idling of the task server and meets the execution requirements of massive tasks.

conclusion

Elastic-Job uses Quartz to schedule jobs and ZooKeeper to implement distributed management. Based on the high availability solution, Elastic capacity expansion and data sharding are added to maximize the utilization of distributed server resources to implement distributed task scheduling. At the same time, due to the idea of sharding, it will also lead to the server that did not get sharding in idle state, which can be avoided in the actual production.

Author’s brief introduction

Xinchun OPPO Senior Backend Engineer

Currently, I am responsible for the research and development of distributed job scheduling, focusing on message queue, Redis database, ElasticSearch and other middleware technologies.

Get more exciting content, scan code to pay attention to [OPPO number wisdom technology] public number