What’s Elastic-Job?

Elastic-job is a distributed scheduling solution consisting of two independent subprojects, elastic-Job-Lite and Elastic-Job-Cloud.

Elastik-job-lite is positioned as a lightweight decentralized solution that uses JAR packages to coordinate distributed tasks. Elastice-job-cloud uses the self-developed Mesos Framework solution to provide additional functions such as resource management, application distribution, and process isolation.

Elasticjob. IO /

Github:github.com/elasticjob/…

Why use elastic-job

At present, our company uses scheduled task executor based on Linux Crontab.

The following problems exist:

  • Unable to centrally manage tasks
  • Can’t scale horizontally
  • No visual interface operation is performed
  • A single point of failure exists

In addition to Linux Crontab in Java, there is Quartz, but Quartz lacks distributed parallel scheduling function.

The problems are also obvious:

  • When my project is a single application, I can run a timed task based on Quartz happily
  • When my project was loaded and expanded to 3 nodes, tasks on the 3 nodes would be executed at the same time, resulting in data chaos
  • At the same time, to ensure that the data is not problem, distributed locks need to be introduced to schedule, which increases the difficulty

How to solve it?

1. Develop your own framework

In this case, you may need to develop a scheduling framework that can meet the business requirements of the company. The cost is high and it is not recommended

I’ve been thinking about writing one myself, but I haven’t started yet, but a scheduling framework for scheduling problems, like Elastice-Job, does a really good Job of letting you define the rules for sharding, and then scheduling it for you based on the data of your slice, and you can control what data each node handles.

If this is the way to go and not write the distribution, I think the easiest way to do it is to use message queues.

Zookeeper is used for scheduling, storing task data, and defining a general interface, which is divided into two parts as follows:

public interface Job {
    void read(a); void process(Object data); }Copy the code

The consumer then implements the above interface to read the data that needs to be processed and processes the distributed data in process

As for distribution, a task can be annotated using a queue, or it can be used in general, so that multiple consumers can consume at the same time, even if one of them fails, the whole task will not be affected, and there is no need to consider failover.

The only control is the read method, which must be executed by only one node or the data will be distributed repeatedly.

The above is just a simple idea, of course there are web page management tasks, manual tasks, and so on.

2. Select an open source solution

TBSchedule: Alibaba’s early open source distributed task scheduling system. The code is slightly older and uses timers instead of thread pools to perform task scheduling. Timers are known to be defective in handling exceptions. In addition, the TBSchedule job type is relatively simple and can only be a mode of data acquisition/processing. There is also a serious lack of documentation.

Spring Batch: Spring Batch is a lightweight, spring-oriented Batch framework that can be applied to a large number of enterprise-level data processing systems. Spring Batch builds on POJOs and the well-known Spring framework to make it easier for developers to access and leverage enterprise-level services. Spring Batch provides a number of repeatable data processing capabilities, including logging/tracing, transaction management, job processing statistical job restart, skip, and resource management.

Elastice-job: Domestic open source product, Chinese document, quick to get started, easy to use, full of features, active community, led by Dangdang architect Liang Zhang, who has spent a lot of time on open source.

Why Elastic-Job?

  • Distributed scheduling coordination
  • Elastic capacity expansion and reduction
  • Failure to transfer
  • Missed execution job retrigger
  • Job sharding consistency ensures that only one execution instance of the same sharding in a distributed environment
  • Self-diagnose and fix problems caused by distributed instability
  • Support parallel scheduling
  • Support job life cycle operations
  • Rich job types
  • Spring integration and namespace provision
  • Operational platform

Job Type Introduction

Simple: Simple job, commonly used, meaning Simple implementation, type without any encapsulation. The SimpleJob interface needs to be implemented. This interface provides only a single method for overwriting, which will be executed periodically. Similar to the Native Quartz interface, but with flexible scaling and sharding.

public class MyElasticJob implements SimpleJob {
    
    @Override
    public void execute(ShardingContext context) {
        switch (context.getShardingItem()) {
            caseZero: / /do something by sharding item 0
                break;
            case1: / /do something by sharding item 1
                break;
            case2: / /do something by sharding item 2
                break;
            // casen: ... }}}Copy the code

DataFlow: The DataFlow type is used to process data streams. The DataflowJob interface needs to be implemented. The interface provides two methods to override, one for fetching (fetchData) and the other for processing (processData) data.

public class MyElasticJob implements DataflowJob<Foo> {
    
    @Override
    public List<Foo> fetchData(ShardingContext context) {
        switch (context.getShardingItem()) {
            case 0: 
                List<Foo> data = // get data from database by sharding item 0
                return data;
            case 1: 
                List<Foo> data = // get data from database by sharding item 1
                return data;
            case 2: 
                List<Foo> data = // get data from database by sharding item 2
                return data;
            // casen: ... } } @Override public void processData(ShardingContext shardingContext, List<Foo> data) { // process data // ... }}Copy the code

Script: A Script job refers to a Script job, which supports all types of scripts such as shell, Python, and Perl. Simply configure scriptCommandLine via console or code, no coding required. The execution script path can contain parameters. After the parameters are passed, the job framework automatically appends the last parameter to the job runtime information.

In fact, I would like to add a new type of task, namely the flow task, for which I have specifically mentioned issues:

Github.com/elasticjob/…

Under specific business requirements, after task A is completed, task B needs to be executed, and so on, such A dependent flow of tasks. At the moment, you can combine these tasks and make code calls to achieve this effect. But I would like to add a feature like job-after="com.xxx.job.XXXJob"After executing this task, another task BB is automatically called. The task BB only needs to configure the task information and remove the CRon, because BB is triggered by other tasks. Of course, all of these tasks have to be in the same ZK namespace, and it would be nice if you could support the kua namespace so that you could have a flow of tasks, and you could have a different shard key for each taskCopy the code

Begin to use

1. About how to build the framework, how to configure will not explain, the official website document is certainly better than I wrote, general open source framework have demo, we can download down into the IDE to run.

Demo address: github.com/elasticjob/…

2. Introduce some experience in use

  • It is suggested to divide tasks according to products. One product corresponds to a project of a task. When the team is large, one team may be responsible for one product, so that it will not be mixed with others
  • The description of the task must be clearly written, what is used, in the configuration task there is a description of the configuration, fill in clearly
/** * <br> <br> @author yinjihuan */ public class UserStatJob implements SimpleJob {private Logger Logger = LoggerFactory.getLogger(UserStatJob.class); @Autowired private EnterpriseProductUserService enterpriseProductUserService; @Autowired private UserStatService userStatService; @Autowired private HouseInfoService houseInfoService; @Autowired private HouseSubstitutionService houseSubstitutionService; @Autowired private LoanApplyService loanApplyService; @Override public void execute(ShardingContext shardingContext) { logger.info("Start executing UserStatJob");
		long total = enterpriseProductUserService.queryCount();
		int pages = PageBean.calcPages(total, 1000);
		for (int i = 1; i <= pages; i++) {
			List<EnterpriseProductUser> users = enterpriseProductUserService.queryByPage(i, 1000);
			for (EnterpriseProductUser user : users) {
				try {
					processStat(user);
				} catch (Exception e) {
					logger.error("User dimension statistics Task is abnormal", e);
					DingDingMessageUtil.sendTextMessage("Abnormal user dimension statistics task :" + e.getMessage());
				}
			}
		}
		logger.info("UserStatJob execution completed");
	}
	
	private void processStat(EnterpriseProductUser user) {
		UserStat stat = userStatService.getByUid(user.getEid(), user.getUid());
		Long eid = user.getEid();
		String uid = user.getUid();
		if (stat == null) {
			stat = new UserStat();
			stat.setEid(eid);
			stat.setUid(uid);
			stat.setUserAddTime(user.getAddTime());
			stat.setCity(user.getCity());
			stat.setRegion(user.getRegion());
		}
		stat.setHouseCount(houseInfoService.queryCountByEidAndUid(eid, uid));
		stat.setHousePrice(houseInfoService.querySumMoneyByEidAndUid(eid, uid));
		stat.setSubstitutionCount(houseSubstitutionService.queryCount(eid, uid));
		stat.setSubstitutionMaxPrice(houseSubstitutionService.queryMaxBudget(eid, uid));
		stat.setLoanEvalCount(loanApplyService.queryUserCountByType(eid, uid, 2));
		stat.setLoanEvalMaxPrice(loanApplyService.queryMaxEvalMoney(eid, uid));
		stat.setLoanCount(loanApplyService.queryUserCountByType(eid, uid, 1));
		stat.setModifyDate(new Date());
		userStatService.save(stat); }}Copy the code
<! -- The user statistics task is executed at 1:10 every day --> <job:simple ID ="userStatJob" class="com.fangjia.job.fsh.job.UserStatJob" registry-center-ref="regCenter"
    	 sharding-total-count="1" cron="0, 10, 1 * *?" sharding-item-parameters=""
    	 failover="true" description="[Housing Life] User dimension statistics task, statistics the user's real estate, replacement, loan and other information UserStatJob"
    	 overwrite="true" event-trace-rdb-data-source="elasticJobLog" job-exception-handler="com.fangjia.job.fsh.handler.CustomJobExceptionHandler">
    	 
    	  <job:listener class="com.fangjia.job.fsh.listener.MessageElasticJobListener"></job:listener>
    	  
 </job:simple>
Copy the code
  • A unified listener is configured for each task to notify the execution and completion of the task, which can be SMS, email or other information. I use Dingding’s robot to send messages to Dingding
/** * job listener, Perform notification * @ nailing messages sent before and after the author yinjihuan * / public class MessageElasticJobListener implements ElasticJobListener {@ Override public void beforeJobExecuted(ShardingContexts shardingContexts) { String date = DateUtils.date2Str(new Date()); String msg = date +" 【FSH-" + shardingContexts.getJobName() + "】 Task start ====" + JsonUtils.toJson(shardingContexts);
    	DingDingMessageUtil.sendTextMessage(msg);
    }
    
    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
    	String date = DateUtils.date2Str(new Date());
    	String msg = date + " 【FSH-" + shardingContexts.getJobName() + "] End of task ===="+ JsonUtils.toJson(shardingContexts); DingDingMessageUtil.sendTextMessage(msg); }}Copy the code
  • Can define an annotation on each task type, annotations who is used to identify the task of development, and then the corresponding nailing message is sent to the who, I personally suggest or create a group, then everyone in it, because if sent to a developer alone, unless his initiative is very high, otherwise it’s no use, my personal advice in the group, So the leader will see it and say so, so, so, so, so, so, your mission is wrong, go and find out why. I sent them uniformly without defining annotations.

  • Task exception processing, you can handle the exception in the task. In addition to logging, you can also use unified encapsulation to send pin messages to notify, so as to know whether there is an exception in the task in real time. You can check my code above.

  • Still have a kind of abnormal is not captured, how to inform the group, can be implemented as a custom exception handling class by configuring the job – the exception handler – = “com. Fangjia. Job. FSH. Handler. CustomJobExceptionHandler”

/** * Custom exception handling, Use when the task abnormal nailing send notification * @ author yinjihuan * / public class CustomJobExceptionHandler implements JobExceptionHandler {private Logger logger = LoggerFactory.getLogger(CustomJobExceptionHandler.class); @Override public void handleException(String jobName, Throwable cause) { logger.error(String.format("Job '%s' exception occur in job processing", jobName), cause);
		DingDingMessageUtil.sendTextMessage("【"+jobName+"] Task exception."+ cause.getMessage()); }}Copy the code
  • You can check whether the job node is down by listening for the existence of the job_name\instances\job_instance_id node. This node is a temporary node and will be deleted if the job server goes offline. Of course, other tools can also be used to monitor.

  • Horizontal expandability should be taken into account when writing tasks. The example I posted above is actually not taken into account. It is just a simple task, because I did not use shardingParameter to process the data of corresponding slices. I’m dealing with less data, so I can write it like this. If you can expect a lot of data to be processed in the future and it will take a long time, it is best to configure the rules for sharding and write the code to be processed by sharding, so that you can directly modify the configuration later and add the next node.

For more technology sharing, please pay attention to wechat public number: Ape World