Dubbo implementation principle and source code analysis — Fine collection | Netty implementation principle and source code analysis — boutique collection |
“Spring Implementation principles and source code analysis — Boutique Collection” | MyBatis implementation principle and source code analysis — boutique collection |
Database Entity Design Collection |
Abstract: the original source http://www.iocoder.cn/Elastic-Job/job-execute/ “taro source” welcome to reprint, retain the, thank you!
This article is shared based on Elastice-Job V2.1.5
- 1. An overview of the
- 2. Lite scheduling jobs
- 3. Create an actuator
- 3.1 Loading Job Configuration
- 3.2 Obtaining the job execution thread pool
- 3.3 Obtaining the Abnormal Job Executor
- 4. The actuator runs
- 4.1 Checking the Job Execution Environment
- 4.2 Obtaining the Sharding Context of the current Job server
- 4.3 Publish job status tracking events
- 4.4 Skipping a running Job that has been missed
- 4.5 Method before Job Execution
- 4.6 Executing a Commonly triggered Job
- 4.6.1 Simple Job executor
- 4.6.2 Data flow Job executor
- 4.6.3 Script Job executor
- 4.7 Executing the missed triggered job
- 4.8 Executing job failover
- 4.9 Executing the Method after the Job is executed
- 666. The eggs
🙂🙂🙂 follow ** wechat official number: ** Have welfare:
- RocketMQ/MyCAT/Sharding-JDBC all source code analysis article list
- RocketMQ/MyCAT/Sharding-JDBC ä¸æ–‡ 解 决 source GitHub address
- Any questions you may have about the source code will be answered carefully. Even do not know how to read the source can also ask oh.
- New source code parsing articles are notified in real time. It’s updated about once a week.
- Serious source communication wechat group.
1. An overview of the
This article focuses on elastice-job-Lite Job execution.
The class diagram involving the main classes looks like this (open the larger image) :
- yellowThe class in
elastic-job-common-core
In the project, there are elastic-Job-Lite and elastic-Job-CloudpublicJob execution class.
In the same way that you feel good about being appreciated when you do something good, open source project contributors are more motivated to like Elastice-Job because of a Star! portal
2. Lite scheduling jobs
Lite scheduling jobs (LiteJobs). After the job is scheduled, #execute() is called to execute the job.
Why LiteJob as the entry point?
In “3.2.3” creating a Job scheduling controller in Elastice-Job-Lite source Analysis — Job Initialization, we can see that Quartz’s JobDetail creation code is as follows:
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
Copy the code
The argument in #newJob() is LiteJob, so this object is created for job execution every time Quartz reaches the scheduled time.
public final class LiteJob implements Job { @Setter private ElasticJob elasticJob; @Setter private JobFacade jobFacade; @Override public void execute(final JobExecutionContext context) throws JobExecutionException { JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute(); }}Copy the code
-
LiteJob by JobExecutorFactory get into the job executor (AbstractElasticJobExecutor), and to perform:
Public final class JobExecutorFactory {/** ** gets the job executor. ** @param elasticJob distributed elasticJob * @param jobFacade job internal service facade * @ return the job executor * / @ SuppressWarnings (" unchecked ") public static AbstractElasticJobExecutor getJobExecutor (final ElasticJob elasticJob, final JobFacade jobFacade) { // ScriptJob if (null == elasticJob) { return new ScriptJobExecutor(jobFacade); } // SimpleJob if (elasticJob instanceof SimpleJob) { return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade); } // DataflowJob if (elasticJob instanceof DataflowJob) { return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade); } throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName()); }}Copy the code
- JobExecutorFactory, JobExecutorFactory, returns the corresponding job executor for different job types.
homework | Operation interface | actuator |
---|---|---|
Simple operation | SimpleJob | SimpleJobExecutor |
Data flow operation | DataflowJob | DataflowJobExecutor |
Script work | ScriptJob | ScriptJobExecutor |
3. Create an actuator
AbstractElasticJobExecutor, the job executor abstract classes. Different job executors inherit this class, and the creation process is consistent.
/ / AbstractElasticJobExecutor. Java public abstract class AbstractElasticJobExecutor {/ * * / * * homework facade object @Getter(AccessLevel.PROTECTED) private final JobFacade jobFacade; @getter (accesslevel.protected) private final JobRootConfiguration jobRootConfig; /** * private final String jobName; /** * Private final ExecutorService ExecutorService; /** * Private final JobExceptionHandler JobExceptionHandler; /** * Fragment error message set * key: fragment number */ private final Map<Integer, String> itemErrorMessages; protected AbstractElasticJobExecutor(final JobFacade jobFacade) { this.jobFacade = jobFacade; / / load job configuration jobRootConfig = jobFacade. LoadJobRootConfiguration (true); jobName = jobRootConfig.getTypeConfig().getCoreConfig().getJobName(); / / to get job execution thread pool executorService = ExecutorServiceHandlerRegistry. GetExecutorServiceHandler (jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER)); JobExceptionHandler = (jobExceptionHandler) getHandler(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER); // Set fragment error messages set itemErrorMessages = new ConcurrentHashMap<>(jobRootConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 1); }} / / SimpleJobExecutor. Java public final class SimpleJobExecutor extends AbstractElasticJobExecutor {/ simple job * * * * / private final SimpleJob simpleJob; public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) { super(jobFacade); this.simpleJob = simpleJob; } } // DataflowJobExecutor.java public final class DataflowJobExecutor extends AbstractElasticJobExecutor { /** * */ private final DataflowJob<Object> DataflowJob; public DataflowJobExecutor(final DataflowJob<Object> dataflowJob, final JobFacade jobFacade) { super(jobFacade); this.dataflowJob = dataflowJob; } } // ScriptJobExecutor.java public final class ScriptJobExecutor extends AbstractElasticJobExecutor { public ScriptJobExecutor(final JobFacade jobFacade) { super(jobFacade); }}Copy the code
3.1 Loading Job Configuration
Read the job configuration from the cache. Read Job configurations are resolved in “3.1” of Elastice-Job-Lite Source Analysis — Job Configurations.
3.2 Obtaining the job execution thread pool
Jobs may be assigned multiple shard items each time they are executed and need to be executed in parallel using thread pools. Considering the isolation between different jobs, one thread pool per job is implemented. Thread pool service registry processor (ExecutorServiceHandlerRegistry) retrieval job thread pool (# getExecutorServiceHandler (…). ) the code is as follows:
Public final class ExecutorServiceHandlerRegistry {/ * * * * key thread pool set: */ Private static final Map<String, ExecutorService> REGISTRY = new HashMap<>(); ** @param jobName jobName * @param executorServiceHandler thread pool service processor * @return thread pool service */ public static synchronized ExecutorService getExecutorServiceHandler(final String jobName, final ExecutorServiceHandler executorServiceHandler) { if (! REGISTRY.containsKey(jobName)) { REGISTRY.put(jobName, executorServiceHandler.createExecutorService(jobName)); } return REGISTRY.get(jobName); }}Copy the code
Use ExecutorServiceHandlerRegistry ExecutorServiceHandler create a thread pool. ExecutorServiceHandler itself is an interface, the default use DefaultExecutorServiceHandler implementation:
/ / ExecutorServiceHandler. Java public interface ExecutorServiceHandler {/ * * * create a thread pool service object. * * @ param jobName job name * * @return Thread pool service object */ ExecutorService createExecutorService(final String jobName); } // DefaultExecutorServiceHandler.java public final class DefaultExecutorServiceHandler implements ExecutorServiceHandler { @Override public ExecutorService createExecutorService(final String jobName) { return new ExecutorServiceObject("inner-job-" + jobName, Runtime.getRuntime().availableProcessors() * 2).createExecutorService(); }}Copy the code
-
Call the #createExecutorService of the ExecutorServiceObject (….) Method to create a thread pool:
public final class ExecutorServiceObject { private final ThreadPoolExecutor threadPoolExecutor; private final BlockingQueue<Runnable> workQueue; public ExecutorServiceObject(final String namingPattern, final int threadSize) { workQueue = new LinkedBlockingQueue<>(); threadPoolExecutor = new ThreadPoolExecutor(threadSize, threadSize, 5L, TimeUnit.MINUTES, workQueue, new BasicThreadFactory.Builder().namingPattern(Joiner.on("-").join(namingPattern, "%s")).build()); threadPoolExecutor.allowCoreThreadTimeOut(true); ** @return Thread pool service object */ public ExecutorService createExecutorService() {return MoreExecutors.listeningDecorator(MoreExecutors.getExitingExecutorService(threadPoolExecutor)); }}Copy the code
MoreExecutors#listeningDecorator(...)
在Sharing-jdbc source code Analysis — SQL ExecutionResolved.MoreExecutors#getExitingExecutorService(...)
Method logic: Convert ThreadPoolExecutor to ExecutorService, and add JVM shutdown hooks120sWaiting for the task to complete:
service.shutdown(); service.awaitTermination(terminationTimeout, timeUnit); Copy the code
How to implement a custom ExecutorServiceHandler?
To see the first AbstractElasticJobExecutor ExecutorServiceHandler is how to get each job:
/ / AbstractElasticJobExecutor. Java / * * * * * get the "custom" processor @ param jobPropertiesEnum job properties enumerated * @ * / private return processor Object getHandler(final JobProperties.JobPropertiesEnum jobPropertiesEnum) { String handlerClassName = jobRootConfig.getTypeConfig().getCoreConfig().getJobProperties().get(jobPropertiesEnum); try { Class<? > handlerClass = Class.forName(handlerClassName); If (jobPropertiesEnum. GetClassType (.) isAssignableFrom (handlerClass)) {/ / must be interface implementation, Return handlerClass.newinstance (); } return getDefaultHandler(jobPropertiesEnum, handlerClassName); } catch (final ReflectiveOperationException ex) { return getDefaultHandler(jobPropertiesEnum, handlerClassName); }} /** * get handler ** @param jobPropertiesEnum job property enumeration * @param handlerClassName handlerClassName * @return handler */ private Object getDefaultHandler(final JobProperties.JobPropertiesEnum jobPropertiesEnum, final String handlerClassName) { log.warn("Cannot instantiation class '{}', use default '{}' class.", handlerClassName, jobPropertiesEnum.getKey()); try { return Class.forName(jobPropertiesEnum.getDefaultValue()).newInstance(); } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) { throw new JobSystemException(e); }}Copy the code
- Each handler has a JobPropertiesEnum, which is obtained using enumeration. Priority from
JobProperties.map
To obtainThe customIs used if the condition is not met (the correct interface is not implemented or the creation of the processor failed)The defaultProcessor implementation. - Each Job can be configured with a different processor, as explained in “2.2.2” Job core configuration in Elastice-Job-Lite Source Code Analysis — Job Configuration.
3.3 Obtaining the Abnormal Job Executor
The JobExceptionHandler is the same as the ExecutorServiceHandler.
/ / ExecutorServiceHandler. Java public interface JobExceptionHandler {/ * * * abnormal processing jobs. * * @ param name * @ param jobName assignments */ void handleException(String jobName, Throwable cause); } // DefaultJobExceptionHandler.java public final class DefaultJobExceptionHandler implements JobExceptionHandler { @Override public void handleException(final String jobName, final Throwable cause) { log.error(String.format("Job '%s' exception occur in job processing", jobName), cause); }}Copy the code
- Default implementation DefaultJobExceptionHandler print abnormal log, don’t throw an exception.
4. The actuator runs
The main process of executing logic is shown below (open the larger picture) :
/ / AbstractElasticJobExecutor. Java public final void the execute () {/ / check job execution environment try { jobFacade.checkJobExecutionEnvironment(); } catch (final JobExecutionEnvironmentException cause) { jobExceptionHandler.handleException(jobName, cause); } / / get the current job servers shard context ShardingContexts ShardingContexts = jobFacade. GetShardingContexts (); / / release job status tracking events (State. TASK_STAGING) if (shardingContexts. IsAllowSendJobEvent ()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName)); } / / skip is running be missed homework if (jobFacade. MisfireIfRunning (shardingContexts. GetShardingItemParameters (). The keySet ())) {/ / Release job status tracking events (State. TASK_FINISHED) if (shardingContexts. IsAllowSendJobEvent ()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format( "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, shardingContexts.getShardingItemParameters().keySet())); } return; } / / execution work before the execution of the methods try {jobFacade. BeforeJobExecuted (shardingContexts); //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobExceptionHandler.handleException(jobName, cause); } / / perform a normal triggered operations execute (shardingContexts, JobExecutionEvent. ExecutionSource. NORMAL_TRIGGER); / / execution triggered skipped homework while (jobFacade. IsExecuteMisfired (shardingContexts. GetShardingItemParameters (). The keySet ())) { jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet()); execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE); } / / perform operation failure transfer jobFacade failoverIfNecessary (); / / perform job execution method after the try {jobFacade. AfterJobExecuted (shardingContexts); //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobExceptionHandler.handleException(jobName, cause); }}Copy the code
There are many steps in the code, so let’s go step by step.
4.1 Checking the Job Execution Environment
// LiteJobFacade.java
@Override
public void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException {
configService.checkMaxTimeDiffSecondsTolerable();
}
Copy the code
- call
ConfigService#checkMaxTimeDiffSecondsTolerable()
Method to verify whether the local time is valid, in”3.3″ of Elastic-Job-Lite Source Analysis -Job Configuration verifies that the native time is validResolved. - Throw an exception if the local time is invalid. If use DefaultJobExceptionHandler as exception handling, only print log, not terminate job execution. JobExceptionHandler If your job has high requirements on time accuracy and expects the job to terminate, you can customize JobExceptionHandler to handle exceptions.
4.2 Obtaining the Sharding Context of the current Job server
Call LiteJobFacade#getShardingContexts() to get the sharding context of the current job server. With this method, jobs get the sharding items they are assigned to execute, which are shared in detail in Elastice-Job-Lite Source Code Parsing — Job Sharding.
4.3 Publish job status tracking events
Post Job status tracking events by calling LiteJobFacade#postJobStatusTraceEvent(), which is shared in detail in Elastic- job-lite source code parsing -Job event tracking.
4.4 Skipping a running Job that has been missed
This logic is parsed together with ** “4.7” for misfired jobs ** to give an overall understanding of elastice-job-lite handling of misfired jobs.
4.5 Method before Job Execution
// LiteJobFacade.java @Override public void beforeJobExecuted(final ShardingContexts shardingContexts) { for (ElasticJobListener each : elasticJobListeners) { each.beforeJobExecuted(shardingContexts); }}Copy the code
- The method that calls the Job listener to execute the Job before it executes is shared in detail in Elastice-Job-Lite Source Code Parsing — Job Listeners.
4.6 Executing a Commonly triggered Job
The title of this section is inexact, and other job sources perform the same logic. There are four ways to perform the task in this section, and we’ll look at them one by one.
/ / AbstractElasticJobExecutor. Java / * * * * * @ perform multiple operations divided param shardingContexts shard context set * @ param executionSource source * / private void execute(final ShardingContexts shardingContexts, Final JobExecutionEvent. ExecutionSource ExecutionSource) {} / * * * * * @ perform multiple operations divided param shardingContexts shard context set * @param executionSource executionSource */ private void process(final ShardingContexts ShardingContexts, Final JobExecutionEvent. ExecutionSource ExecutionSource) {} / * * * * * the implementation of a single job divided @ param shardingContexts shard context set * @param item Fragment number * @param startEvent Execute event (start) */ private void Process (Final ShardingContexts ShardingContexts, final int item, Final JobExecutionEvent startEvent) {} /** * the shard that executes a single job ** @param shardingContext Shard context collection */ protected abstract void process(ShardingContext shardingContext);Copy the code
Ps: The Job event-related logic is skipped and shared in detail in Elastice-Job-Lite source Code Parsing — Job Event Tracing.
private void execute(shardingContexts, executionSource)
// AbstractElasticJobExecutor.java private void execute(final ShardingContexts shardingContexts, Final JobExecutionEvent. ExecutionSource ExecutionSource) {/ / no shard of execution, Release job status tracking events (State. TASK_FINISHED) if (shardingContexts. GetShardingItemParameters (). The isEmpty ()) {if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName)); } return; } / / registered homework startup information jobFacade. RegisterJobBegin (shardingContexts); / / release job status tracking events (State. TASK_RUNNING) String taskId = shardingContexts. GetTaskId (); if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, ""); } // try { process(shardingContexts, executionSource); } finally {/ / TODO consider increasing the state of operation failure, and consider how to deal with the whole circuit operation failure / / registered job completion information jobFacade registerJobCompleted (shardingContexts); // Depending on whether there is an exception, Release job status tracking events (State) TASK_FINISHED/State) TASK_ERROR) if (itemErrorMessages. IsEmpty ()) {if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, ""); } } else { if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString()); }}}}Copy the code
-
The method parameter executionSource represents the executionSource, and there are three types:
Public enum ExecutionSource {/** * Normal trigger */ NORMAL_TRIGGER, /** * Missed execute */ MISFIRE, /** * Failed execute */ FAILOVER}Copy the code
-
Call LiteJobFacade# registerJobBegin (…). Method to register job startup information:
// LiteJobFacade.java @Override public void registerJobBegin(final ShardingContexts shardingContexts) { executionService.registerJobBegin(shardingContexts); } // ExecutionService.java public void registerJobBegin(final ShardingContexts shardingContexts) { JobRegistry.getInstance().setJobRunning(jobName, true); if (!configService.load(true).isMonitorExecution()) { return; } for (int each : shardingContexts.getShardingItemParameters().keySet()) { jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), ""); } } Copy the code
- Only set when the job is configuredMonitor job runtime status(
LiteJobConfiguration.monitorExecution = true
), record the running status of the job. - call
JobNodeStorage#fillEphemeralJobNode(...)
Methods recordsThe assigned job fragment itemRunning. How was it recorded, inElastice-job-lite: Job Data StoreShare in detail.
- Only set when the job is configuredMonitor job runtime status(
-
Call LiteJobFacade# registerJobCompleted (…). Method Registration job completion information:
// LiteJobFacade.java @Override public void registerJobCompleted(final ShardingContexts shardingContexts) { executionService.registerJobCompleted(shardingContexts); if (configService.load(true).isFailover()) { failoverService.updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet()); }} // executionService.java /** * Register job completion information ** @param shardingContexts */ public void registerJobCompleted(final ShardingContexts shardingContexts) { JobRegistry.getInstance().setJobRunning(jobName, false); if (! configService.load(true).isMonitorExecution()) { return; } for (int each : shardingContexts.getShardingItemParameters().keySet()) { jobNodeStorage.removeJobNodeIfExisted(ShardingNode.getRunningNode(each)); }}Copy the code
- Only set when the job is configuredMonitor job runtime status(
LiteJobConfiguration.monitorExecution = true
) to remove the job running state. - call
JobNodeStorage#removeJobNodeIfExisted(...)
methodsRemoves the assigned job fragment entryRunning flag indicating that the job shard item is not running. - call
FailoverService#updateFailoverComplete(...)
Method to update the status of the shard item after failoverElastic-Job-Lite Source Code Parsing — Job FailoverShare in detail.
- Only set when the job is configuredMonitor job runtime status(
private void process(shardingContexts, executionSource)
// AbstractElasticJobExecutor.java private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) { Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet(); // Single fragment, Directly execute the if (1 = = items. The size ()) {int item. = shardingContexts getShardingItemParameters (). The keySet (). The iterator (), next (); JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item); // Run a job process(shardingContexts, item, jobExecutionEvent); return; } final CountDownLatch = new CountDownLatch(items.size()); for (final int each : items) { final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each); if (executorService.isShutdown()) { return; Executorservice. submit(new Runnable() {@override public void run() {try {// Execute a job process(shardingContexts, each, jobExecutionEvent); } finally { latch.countDown(); }}}); } // Wait for multiple partitions to complete try {latch.await(); } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); }}Copy the code
- When allocating a single shard item, it is executed directly without using a thread pool, resulting in better performance.
- When multiple shard items are allocated, they are executed concurrently using the thread pool, and CountDownLatch is implemented to wait for all shard items to complete.
private void process(shardingContexts, item, startEvent)
protected abstract void process(shardingContext)
// AbstractElasticJobExecutor.java private void process(final ShardingContexts shardingContexts, final int item, Final JobExecutionEvent startEvent) {/ / release events (start) if (shardingContexts. IsAllowSendJobEvent ()) { jobFacade.postJobExecutionEvent(startEvent); } log.trace("Job '{}' executing, item is: '{}'.", jobName, item); JobExecutionEvent completeEvent; Try {// Execute a single job process(new ShardingContext(ShardingContext, item)); / / release events execution (successful) completeEvent = startEvent. ExecutionSuccess (); log.trace("Job '{}' executed, item is: '{}'.", jobName, item); if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobExecutionEvent(completeEvent); } // CHECKSTYLE:OFF} catch (final Throwable cause) {// CHECKSTYLE:ON // Release execution event (failed) completeEvent = startEvent.executionFailure(cause); jobFacade.postJobExecutionEvent(completeEvent); ItemErrorMessages. Put (item, ExceptionUtil. Transform (cause)); // jobExceptionHandler.handleException(jobName, cause); } } protected abstract void process(ShardingContext shardingContext);Copy the code
- Different job executor implementation classes through implementation
#process(shardingContext)
Abstract methods that implement pairsSingle shard itemProcessing of jobs. - Different job executor implementation classes through implementation
#process(shardingContext)
Abstract methods that implement pairsSingle shard itemProcessing of jobs. - Different job executor implementation classes through implementation
#process(shardingContext)
Abstract methods that implement pairsSingle shard itemProcessing of jobs.
4.6.1 Simple Job executor
SimpleJobExecutor, SimpleJobExecutor
Public final class SimpleJobExecutor extends AbstractElasticJobExecutor {/ simple job * * * * / private final SimpleJob simpleJob; @Override protected void process(final ShardingContext shardingContext) { simpleJob.execute(shardingContext); }}Copy the code
- call
SimpleJob#execute()
Method to process a single fragment item job.
4.6.2 Data flow Job executor
DataflowJobExecutor, data stream job executor.
Public final class DataflowJobExecutor extends AbstractElasticJobExecutor {/ * * * * / private final data flow operation object DataflowJob<Object> dataflowJob; @Override protected void process(final ShardingContext shardingContext) { DataflowJobConfiguration dataflowConfig = (DataflowJobConfiguration) getJobRootConfig().getTypeConfig(); If (dataflowConfig isStreamingProcess ()) {/ / data flow processing streamingExecute (shardingContext); } else { oneOffExecute(shardingContext); Private void streamingExecute(final shardingContext shardingContext); { List<Object> data = fetchData(shardingContext); while (null ! = data && ! data.isEmpty()) { processData(shardingContext, data); if (! getJobFacade().isEligibleForJobRunning()) { break; } data = fetchData(shardingContext); Private void oneOffExecute(final shardingContext shardingContext) { List<Object> data = fetchData(shardingContext); if (null ! = data && ! data.isEmpty()) { processData(shardingContext, data); }}}Copy the code
-
When the job configuration Settings flow processing data (DataflowJobConfiguration streamingProcess = true), call # streamingExecute () load data continuously, processing data continuously, Until the data is empty or the job is unfit to continue:
// LiteJobFacade.java @Override public boolean isEligibleForJobRunning() { LiteJobConfiguration liteJobConfig = configService.load(true); if (liteJobConfig.getTypeConfig() instanceof DataflowJobConfiguration) { return ! ShardingService. IsNeedSharding () / / homework don't need to shard && ((DataflowJobConfiguration) liteJobConfig.getTypeConfig()).isStreamingProcess(); } return ! shardingService.isNeedSharding(); // Jobs do not need to be re-sharded}Copy the code
- The job needs to be resharded, so it is not suitable to continue streaming data processing.
In the case of streaming job processing, it is recommended that processData update the state of the data after processing it to prevent fetchData from fetching it again, so that the job never stops. Streaming data processing is designed according to TbSchedule and is suitable for continuous data processing.
-
When the job configuration is not set flow processing data (DataflowJobConfiguration. StreamingProcess = false), call # oneOffExecute () a load data, process the data at a time.
-
Call the #fetchData() method to load the data; Call # processData (…). Methods To process data:
// dataflowJobexEcutor.java /** * load data ** @param shardingContext * @return data */ private List<Object> fetchData(final ShardingContext shardingContext) { return dataflowJob.fetchData(shardingContext); } /** * processData ** @param shardingContext * @param data data */ private void processData(final shardingContext) shardingContext, final List<Object> data) { dataflowJob.processData(shardingContext, data); }Copy the code
4.6.3 Script Job executor
ScriptJobExecutor, ScriptJobExecutor.
public final class ScriptJobExecutor extends AbstractElasticJobExecutor { @Override protected void process(final ShardingContext shardingContext) { final String scriptCommandLine = ((ScriptJobConfiguration) getJobRootConfig().getTypeConfig()).getScriptCommandLine(); if (Strings.isNullOrEmpty(scriptCommandLine)) { throw new JobConfigurationException("Cannot find script command line for job '%s', job is not executed.", shardingContext.getJobName()); } executeScript(shardingContext, scriptCommandLine); } /** * execute scripts ** @param shardingContext shardingContext * @param scriptCommandLine executeScript path */ private void executeScript(final) ShardingContext shardingContext, final String scriptCommandLine) { CommandLine commandLine = CommandLine.parse(scriptCommandLine); // JSON format to pass the argument commandline.addargument (gsonFactory.getgson ().tojson (shardingContext), false); try { new DefaultExecutor().execute(commandLine); } catch (final IOException ex) { throw new JobConfigurationException("Execute script failure.", ex); }}}Copy the code
-
ScriptCommandLine passes the script path. Implement script calls using the Apache Commons Exec toolkit:
Script jobs refer to Script jobs, which support all types of scripts such as shell, Python, and Perl. Simply configure scriptCommandLine via console or code, no coding required. The execution script path can contain parameters. After the parameters are passed, the job framework automatically appends the last parameter to the job runtime information.
-
Script parameters are passed in JSON format.
4.7 Executing the missed triggered job
When a Job is executed so long that the next Job is not executed until the next time, Elastic- job-Lite sets the Job shard to misfired. The next time a job is executed, the missed job fragment entries are replenished.
The flag job was missed
// JobScheduler.java private Scheduler createScheduler() { Scheduler result; / / omit part of the code. The result getListenerManager () addTriggerListener (schedulerFacade. NewJobTriggerListener ()); return result; } private Properties getBaseQuartzProperties () {/ / omit part of the code. The result put (" org. Quartz. JobStore. MisfireThreshold ", "1"); return result; } // JobScheduleController.class private CronTrigger createTrigger(final String cron) { return TriggerBuilder.newTrigger() .withIdentity(triggerIdentity) .withSchedule(CronScheduleBuilder.cronSchedule(cron) .withMisfireHandlingInstructionDoNothing()) .build(); }Copy the code
-
. Org. Quartz. JobStore misfireThreshold set the maximum allowed more than 1 millisecond, homework which is regarded as miss shard.
-
# withMisfireHandlingInstructionDoNothing () sets the Quartz systems do not immediately to perform a task, but wait until the scheduled time (closest to the current time. Re-execute missed jobs and send them to elastice-job-Lite.
-
Use TriggerListener to listen for missed job fragments:
// JobTriggerListener.java public final class JobTriggerListener extends TriggerListenerSupport { @Override public void triggerMisfired(final Trigger trigger) { if (null != trigger.getPreviousFireTime()) { executionService.setMisfire(shardingService.getLocalShardingItems()); } } } // ExecutionService.java public void setMisfire(final Collection<Integer> items) { for (int each : items) { jobNodeStorage.createJobNodeIfNeeded(ShardingNode.getMisfireNode(each)); } } Copy the code
- call
#setMisfire(...)
The set job sharding entry missed execution.
- call
Skip running jobs that have been missed
// LiteJobFacade.java @Override public boolean misfireIfRunning(final Collection<Integer> shardingItems) { return executionService.misfireIfHasRunningItems(shardingItems); } // ExecutionService.java public boolean misfireIfHasRunningItems(final Collection<Integer> items) { if (! hasRunningItems(items)) { return false; } setMisfire(items); return true; } public boolean hasRunningItems(final Collection<Integer> items) { LiteJobConfiguration jobConfig = configService.load(true); if (null == jobConfig || ! jobConfig.isMonitorExecution()) { return false; } for (int each : items) { if (jobNodeStorage.isJobNodeExisted(ShardingNode.getRunningNode(each))) { return true; } } return false; }Copy the code
- When the assigned job fragment item existsEither shard is running, sets the sharding entryallMissed execution (
misfired
), do not execute these job shards. If the skip is not performed, theAt the same timeRun a job shard. - This function depends on job configurationMonitor job runtime status(
LiteJobConfiguration.monitorExecution = true
).
Executes the job fragment item that was missed
// AbstractElasticJobExecutor.java public final void execute() { // .... Omit part of the code / / execution triggered skipped homework while (jobFacade. IsExecuteMisfired (shardingContexts. GetShardingItemParameters (). The keySet ())) { jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet()); execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE); } / /... Java @override public Boolean isExecuteMisfired(Final Collection<Integer> shardingItems) {// liteJobfacade.java @override public Boolean isExecuteMisfired(Final Collection<Integer> shardingItems) { Return isEligibleForJobRunning() // Appropriate to continue && ConfigService.load (true).getTypeconfig ().getCoreConfig().ismisfire () // Job configuration open job missed trigger &&! executionService.getMisfiredJobItems(shardingItems).isEmpty(); Public void clearMisfire(final Collection<Integer> shardingItems) {public void clearMisfire(final Collection<Integer> shardingItems) { executionService.clearMisfire(shardingItems); }Copy the code
- Clears the missed flag of the assigned job sharding item and executes the job sharding item.
- Why used herewhile(…)?Defensive programming.
#isExecuteMisfired(...)
useMemory cacheAnd updates of this data depend on Zookeeper notificationsasynchronousUpdate, may be due to various circumstances, such as network, data may not be updated in a timely mannerData inconsistency. usewhile(…)Do defense programming, make sureMemory cacheThe data has been updated.
4.8 Executing job failover
// LiteJobFacade.java @Override public void failoverIfNecessary() { if (configService.load(true).isFailover()) { failoverService.failoverIfNecessary(); }}Copy the code
- Invoke FailoverService to perform job failover
#failoverIfNecessary()
),Elastic-Job-Lite Source Code Parsing — Job FailoverShare in detail.
4.9 Executing the Method after the Job is executed
// LiteJobFacade.java @Override public void afterJobExecuted(final ShardingContexts shardingContexts) { for (ElasticJobListener each : elasticJobListeners) { each.afterJobExecuted(shardingContexts); }}Copy the code
- Calling Job listeners to execute the post-job method is shared in detail in Elastice-Job-Lite Source Code Parsing — Job Listeners.
666. The eggs
Shout! Slightly longer slightly longer!
The following two articles will be updated to pave the way for the following articles on primary node election, failover, job sharding strategy, etc. :
- Elastical-job-lite Source Code Parsing — Registry
- Elastice-job-lite: Job Data Store
Dao friends, get on the bus, share a wave of friends!
Uh-huh. I can’t wait to read Elastice-Job-Cloud. I held back my heart for you.
Narrator: The writer has secretly read. Taro way gentleman: aside gentleman, you big ye!