Dubbo implementation principle and source code analysis — Fine collection | Netty implementation principle and source code analysis — boutique collection |
“Spring Implementation principles and source code analysis — Boutique Collection” | MyBatis implementation principle and source code analysis — boutique collection |
Spring MVC Implementation Principle and source code Analysis — A collection of fine works | Database Entity Design Collection |
Abstract: the original source http://www.iocoder.cn/Elastic-Job/job-failover/ “taro source” welcome to reprint, retain the, thank you!
This article is shared based on Elastice-Job V2.1.5
- 1. An overview of the
- 2. Job node crashes
- 3. Job failure transfer
- 4. Obtain the job fragment context set
- 5. The failover function of listening jobs is disabled
- 666. The eggs
1. An overview of the
This article focuses on elastice-Job-Lite Job failover.
When a job node executes a job exception and crashes, its assigned job shard entry will not be re-executed until the next shard. After failover is enabled, these job fragments are captured by other job nodes and “executed”. Why is the execution quoted here? 😈 we’ll share it below.
I’ve understood failover for a long time, so I’m quoting the official explanation for it to give you a better idea:
Source address: https://my.oschina.net/u/719192/blog/506062 failure transfer: running job server crash will not lead to fragmentation again, only at the next job starts shard. The failover function can be enabled to monitor that other job servers are idle and capture orphan fragments that are not completed for execution during this job execution. — delimiter — source address: http://dangdangdotcom.github.io/elastic-job/elastic-job-lite/03-design/lite-design/ transfer functions, implementation failure in a server to perform active grab unallocated after subdivision, And actively look for available servers to perform tasks after a server goes offline.
Look at the concept may be more difficult to understand, code up!
The class diagram involving the main classes looks like this (open the larger image) :
- The pink class is in
com.dangdang.ddframe.job.lite.internal.failover
In this package, the Elastic-Job-Lite Job failover is implemented. - FailoverService job failure transfer service.
- FailoverNode: Job failure transfer data store path.
- FailoverListenerManager, job failover listening manager.
In the same way that you feel good about being appreciated when you do something good, open source project contributors are more motivated to like Elastice-Job because of a Star! portal
2. Job node crashes
When a job node crashes, the JobCrashedJobListener listens to the situation and performs job failover.
// JobCrashedJobListener.java class JobCrashedJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) { // /${JOB_NAME}/instances/${INSTANCE_ID} String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1); if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) { return; } List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId); // /${JOB_NAME}/sharding/${ITEM_ID}/failover if (! failoverItems.isEmpty()) { for (int each : failoverItems) { failoverService.setCrashedFailoverFlag(each); failoverService.failoverIfNecessary(); } } else { for (int each : shardingService.getShardingItems(jobInstanceId)) { // /${JOB_NAME}/sharding/${ITEM_ID}/instance failoverService.setCrashedFailoverFlag(each); failoverService.failoverIfNecessary(); } } } } }Copy the code
-
Check that /${JOB_NAME}/instances/${INSTANCE_ID} is removed and perform job failover logic. ❓ said that the job node crash? After confirmation, there is a BUG in this section at present, and it has not been determined whether the operating node is crashed. So in the current version, job failover is for all job node shutdown logic, not just job crash shutdown.
-
Call FailoverService#getFailoverItems(…) first Method to obtain ${JOB_NAME}/sharding/${ITEM_ID}/failover fragments corresponding to the closed job node (${JOB_INSTANCE_ID}).
If the job fragment item is empty, then call ShardingService#getShardingItems(…). ${JOB_NAME}/sharding/${ITEM_ID}/ instance_id = ${JOB_NAME}/sharding/${ITEM_ID}/ instance_id
Why is that order? Put it together with FailoverService#failoverIfNecessary(). FailoverService#getFailoverItems(…) Method implementation:
// FailoverService public List<Integer> getFailoverItems(final String jobInstanceId) { List<String> items = jobNodeStorage.getJobNodeChildrenKeys(ShardingNode.ROOT); List<Integer> result = new ArrayList<>(items.size()); for (String each : items) { int item = Integer.parseInt(each); String node = FailoverNode.getExecutionFailoverNode(item); // `${JOB_NAME}/sharding/${ITEM_ID}/failover` if (jobNodeStorage.isJobNodeExisted(node) && jobInstanceId.equals(jobNodeStorage.getJobNodeDataDirectly(node))) { result.add(item); } } Collections.sort(result); return result; } Copy the code
-
Call FailoverService# setCrashedFailoverFlag (…). Method, set the invalid fragment tag /${JOB_NAME}/leader/failover/items/${ITEM_ID}. This data node is a permanent node and stores empty strings (“”).
// FailoverService.java public void setCrashedFailoverFlag(final int item) { if (! isFailoverAssigned(item)) { jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item)); // /${JOB_NAME}/leader/failover/items/${ITEM_ID} } } private boolean isFailoverAssigned(final Integer item) { return jobNodeStorage.isJobNodeExisted(FailoverNode.getExecutionFailoverNode(item)); }Copy the code
-
Call the FailoverService#failoverIfNecessary() method to perform job fail-over if it is required.
3. Job failure transfer
Call the FailoverService#failoverIfNecessary() method to perform job fail-over if it is required.
// FailoverService.java public void failoverIfNecessary() { if (needFailover()) { jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback()); }}Copy the code
-
Call the #needFailover() method to determine whether the failover condition is met.
Private Boolean needFailover() {// '${JOB_NAME}/leader/failover/items/${ITEM_ID}' Return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && ! JobNodeStorage. GetJobNodeChildrenKeys (FailoverNode. ITEMS_ROOT). The isEmpty () / / in the operation of the current operation is not &&! JobRegistry.getInstance().isJobRunning(jobName); }Copy the code
-
Condition 1: ${JOB_NAME}/leader/failover/items/${ITEM_ID} contains job fragments for failover.
-
Condition 2: The current job is not running. This condition is the definition of the job node being idle submitted above.
Failover: A running job server crash does not result in resharding, only when the next job starts. The failover function can be enabled to monitor other job servers [idle] during the execution of this job, and grab orphan fragments that are not completed for execution
-
-
Call JobNodeStorage# executeInLeader (…). Method, using FailoverNode. LATCH (/ ${JOB_NAME} / leader/failover/LATCH) path composed of distributed lock, ensure FailoverLeaderExecutionCallback callback methods at the same time, Even if multiple job nodes are invoked, one and only one job node executes. Also, although JobNodeStorage#executeInLeader(…) Method with the Leader keyword. Operations that do not actually have to be on the primary node can be called by any job node that has a distributed lock. The current logic for distributed locking, in elastice-job-Lite, calls JobNodeStorage#executeInLeader(…). Method, the data is stored in the /leader/ node directory. Details about distributed locking are shared in “3.1 Performing operations on the Master node” in Elastice-Job-Lite Source Code Analysis — Registry.
FailoverLeaderExecutionCallback callback logic is as follows:
Class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {@ Override public void the execute () {/ / judge to failure if (JobRegistry.getInstance().isShutdown(jobName) || ! needFailover()) { return; ${JOB_NAME}/leader/failover/items/${ITEM_ID} 'job fragment int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0)); log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem); // Set ${JOB_NAME}/sharding/${ITEM_ID}/failover to the current job node jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId()); ${JOB_NAME}/leader/failover/items/${ITEM_ID} jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem)); // TODO should not use triggerJob, instead use Executor unified scheduling questions: Why do you use Executor unity, // Trigger job execution JobScheduleController JobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName); if (null ! = jobScheduleController) { jobScheduleController.triggerJob(); }}}Copy the code
-
Call the #needFailover() method again to ensure that failover is still required while waiting for distributed lock acquisition. Because it is possible that multiple job nodes invoked the callback and the first job node performed failover, it is possible that the second job node does not need to perform failover.
-
Call JobNodeStorage#getJobNodeChildrenKeys(failoverNode.items_root)#get(0) Get a ${JOB_NAME}/leader/failover/items/${ITEM_ID} job fragment.
Call JobNodeStorage# fillEphemeralJobNode (…). ${JOB_NAME}/sharding/${ITEM_ID}failover fragments to the current job node (${JOB_INSTANCE_ID}).
Call JobNodeStorage# removeJobNodeIfExisted (…). Method to remove the ${JOB_NAME}/leader/failover/items/${ITEM_ID} job fragment.
-
Call the JobScheduleController#triggerJob() method to start the job immediately. By calling this method, the actual job is not executed immediately, but merely triggered. If there are multiple defunct job fragments, does calling JobScheduleController#triggerJob() multiple times cause the job to execute in parallel? The answer is no, because the number of Quartz threads for a job is set to 1.
// JobScheduler.java private Properties getBaseQuartzProperties() { Properties result = new Properties(); / /... Omit irrelevant code result. The put (" org. Quartz. The threadPool. ThreadCount ", "1"); // Number of Quartz threads: 1 //... Return result; }Copy the code
If homework shard item implement transfer, each operation node is not in the idle state, not FailoverLeaderExecutionCallback have not been able to be called back? Of course not. The job calls LiteJobFacade#failoverIfNecessary() to fetch the job fragment assigned to it after executing it:
public final void execute() { // ... Omit the execute code has nothing to do / / perform common trigger homework (shardingContexts, JobExecutionEvent. ExecutionSource. NORMAL_TRIGGER); / / execution triggered skipped homework while (jobFacade. IsExecuteMisfired (shardingContexts. GetShardingItemParameters (). The keySet ())) { jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet()); execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE); } / / perform operation failure transfer jobFacade failoverIfNecessary (); / /... Java @override public void failoverIfNecessary() {if (configService.load(true).isFailover())) { failoverService.failoverIfNecessary(); } } // FailoverService.java public void failoverIfNecessary() { if (needFailover()) { jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback()); }}Copy the code
Let’s go back to the code at JobCrashedJobListener. Why is it that job shard entries that get failover are in such a priority? A work node with ${JOB_NAME} / sharding / ${ITEM_ID} / failover data fragmentation, mean shard item have been executed finish assigned to it, otherwise how callback FailoverLeaderExecutionCallback method, How about fetching the job shard item for failover? !
Narrator: double click 666, pay attention to the author of a wave of public number.
The JobFacade#failoverIfNecessary() method here captures only one failed failover job shard. The advantage of this is that multiple job shards can share the set of shards that perform failover. For example, A job cluster has nodes A, B, and C, which are divided into six job fragments. If node C fails, nodes A and B share two job fragments of node C. However, it is also possible for fragments to fail over to be missed. For example, A job cluster has three nodes A, B, and C, which are divided into nine job fragments. If C node fails, A/B node shares two job fragments of C node, one of which is missed and can only be executed until the next job fragment. This algorithm will be optimized in the future.
4. Obtain the job fragment context set
In the Elastic – Job – Lite source analysis – Job execution “” 4.2 for current Job servers shard context”, we can see that the Job executor (AbstractElasticJobExecutor) to perform operations, Gets the shard context of the current job server for execution. The overall acquisition process is shown in the following sequence (open the larger picture) :
- The Red Cross is shared in detail in Elastice-Job-Lite Source Code Parsing — Job Sharding.
The implementation code is as follows:
Litejobfacade.java.override public ShardingContexts getShardingContexts() {// Obtain the fragmented item for failover Boolean isFailover = configService.load(true).isFailover(); if (isFailover) { List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems(); if (! FailoverShardingItems. IsEmpty ()) {/ / 【 neglect, Operating shard explanation for current job servers shard context return executionContextService. GetJobShardingContext (failoverShardingItems); }} / / job oversight, rounding operation divided 】 【 subdivision, if you need to shard and give priority to the current node shardingService. ShardingIfNecessary (); / / ignore, homework subdivision steps gain distribution in the native homework shard item List < Integer > shardingItems = shardingService. GetLocalShardingItems (); / / remove the projects assigned in the failure of the native transfer project divided the if (isFailover) {shardingItems. RemoveAll (failoverService. GetLocalTakeOffItems ()); } / / remove item disabled homework shard shardingItems removeAll (executionService. GetDisabledItems (shardingItems)); / / ignore, homework subdivision, rounding 】 【 for current job servers shard context return executionContextService. GetJobShardingContext (shardingItems); }Copy the code
-
Call the FailoverService#getLocalFailoverItems() method to get the set of failover shard items running on this job node.
// FailoverService.java public List<Integer> getLocalFailoverItems() { if (JobRegistry.getInstance().isShutdown(jobName)) { return Collections.emptyList(); } return getFailoverItems(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId()); // `${JOB_NAME}/sharding/${ITEM_ID}/failover` } Copy the code
-
Call the ExecutionContextService#getJobShardingContext() method to get the current job server sharding context. 4. Get a Set of Job Sharding Contexts. This is explained in detail in Elastice-Job-Lite source Code Parsing — Job Sharding.
-
If there are no captured failover fragments on the job node, the job fragments allocated to the job decomposition are obtained. At this time, you will see a slightly strange method call shardingItems. RemoveAll (failoverService. GetLocalTakeOffItems ()). Why is that? For example, job node A holds job fragment item [0, 1], at this time, the network is abnormally disconnected, resulting in [0, 1] being captured by job node B by invalidation transfer. At this time, if job node A recovers, job fragment item [0, 1] still belongs to job node A, but it may have been executed at job node B, so it needs to be removed. Avoid multiple nodes running the same job fragment. FailoverService#getLocalTakeOffItems()
// failoverservice. Java /** * Get the serial number of the failed job server running on this job server ** @return Serial number of the failed job server running on this job server */ public List<Integer> getLocalTakeOffItems() { List<Integer> shardingItems = shardingService.getLocalShardingItems(); List<Integer> result = new ArrayList<>(shardingItems.size()); for (int each : shardingItems) { if (jobNodeStorage.isJobNodeExisted(FailoverNode.getExecutionFailoverNode(each))) { result.add(each); } } return result; }Copy the code
5. The failover function of listening jobs is disabled
class FailoverSettingsChangedJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { if (configNode.isConfigPath(path) && Type.NODE_UPDATED == eventType && ! LiteJobConfigurationGsonFactory. FromJson (data). IsFailover ()) {/ / off failure transfer function failoverService. RemoveFailoverInfo (); }}}Copy the code
666. The eggs
Narrator: Ahhhh, that’s a little convoluted. Interviewer: Patience, patience, patience.
Dao friends, get on the bus, share a wave of friends!