The admin service of xxL-job is the scheduling center of XXL-job. It is responsible for managing and scheduling registered jobs. For details about how to use XXL-job, see the XXL-Job Distributed Scheduling Framework in The Reference Section.
In addition to managing some interfaces on the page, the admin service has some core functions, such as:
1. Automatically schedules jobs based on job configurations.
2. Receive executor instance requests, register and go offline;
3. Monitor the failed job and retry.
4. End abnormal jobs.
5. Clear and count logs;
These functions are automatically run in the background after the admin service is started. The implementation of these functions of the admin service is described in details below.
XxlJobAdminConfig
Admin configuration class
XxlJobAdminConfig is a configuration class for the Admin service. When the admin service is started, it will start all background threads of the admin service in addition to configuring some parameters of the admin service.
attribute
The attributes of this class are divided into five categories:
1. Parameters in the configuration file, such as accessToken;
2. Mapper of each data table of DAO layer;
Some beans in the Spring container, such as JobAlarmer, DataSource, etc.
4. Private variable XxlJobScheduler object;
private XxlJobScheduler xxlJobScheduler;
Copy the code
Private static variable adminConfig, pointing to the instance itself.
private static XxlJobAdminConfig adminConfig = null;
public static XxlJobAdminConfig getAdminConfig(a) {
return adminConfig;
}
Copy the code
methods
This class has two important methods, which implement InitializingBean and DisposableBean respectively.
afterPropertiesSet
Method in the Spring container after the Bean has been initialized;destroy
Method, which performs the destruction operation when the container destroys the Bean;
The XxlJobScheduler scheduler scheduler scheduler scheduler scheduler scheduler scheduler scheduler scheduler scheduler scheduler scheduler
@Override
public void afterPropertiesSet(a) throws Exception {
adminConfig = this;
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}
@Override
public void destroy(a) throws Exception {
xxlJobScheduler.destroy();
}
Copy the code
XxlJobAdminConfig, as the configuration class of admin service, is used to call the initialization method of XxlJobScheduler to initialize and start the functions of admin service when Spring container starts.
XxlJobScheduler
The XxlJobScheduler scheduler calls the helper classes (xxxHelper) to start and end different threads and functions.
If XxlJobScheduler is viewed as an initiator, then the init method is the start button, and XxlJobAdminConfig is used to press the button.
public void init(a) throws Exception {
// 0, initialize the internationalization message, not very important ignore
initI18n();
// Initialize the scheduler thread pool
JobTriggerPoolHelper.toStart();
// 2. Start the register monitor thread
JobRegistryHelper.getInstance().start();
// 3. The job monitor thread fails to be started. Query the failure log and retry
JobFailMonitorHelper.getInstance().start();
// Start the thread of job loss monitor. Some jobs have not responded after dispatching instructions. The state is always running.
JobCompleteHelper.getInstance().start();
// 5. Start the log collection and clearing thread
JobLogReportHelper.getInstance().start();
// 6. Start the scheduling thread to schedule the job
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
Copy the code
We’ll focus on the init classes and what they do, and then we’ll take a brief look at what destroy does.
JobTriggerPoolHelper
Trigger thread pool helper class: manage Trigger thread pool, add Trigger thread to the thread pool
When the admin service sends a scheduling request to an executor instance to execute a job, the xxlJobtriger.trigger () method is called to transfer parameters such as job_id, jobHandler, job_log_id, blocking policy, and so on, Packaged as a TriggerParam object) to the ExecutorBiz object to perform a schedule.
Xxl-job makes two optimizations for the scheduling process:
- Each time a scheduling request is issued, a new thread is created and the XxlJobTrigger method is executed asynchronously.
- When creating a new thread, different thread pools are selected according to the time it takes to execute the XxlJobTrigger method.
Properties and start
Initialize the thread pool
JobTriggerPoolHelper initializes its two thread pool properties in the toStart method as follows:
/** * Fast and slow thread pools, which execute tasks with different scheduling tasks respectively, to achieve isolation and avoid blocking each other */
private ThreadPoolExecutor fastTriggerPool = null;
private ThreadPoolExecutor slowTriggerPool = null;
public void start(a) {
fastTriggerPool = new ThreadPoolExecutor(
10./ / at least 200
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(1000),
r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()));
slowTriggerPool = new ThreadPoolExecutor(
10./ / at least 100
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(2000),
r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()));
}
Copy the code
Each time there is a scheduled request, threads are created in these two thread pools, and the logic for creating threads is in the addTrigger method.
addTrigger
Create a new thread and call xxlJobtrigger.trigger ()
Different jobs have different execution duration. To prevent blocking between different time-consuming jobs, xxl-jobs are differentiated according to the response time of jobs, as follows:
- If the job is short, create a thread in the fastTriggerPool thread pool.
- If the job takes a long time and is called frequently, a thread is created in the slowTriggerPool thread pool.
If a fast job and a slow job that are frequently invoked are created in the same thread pool, the slow job occupies a large number of threads. As a result, the fast job threads cannot run in a timely manner, reducing the thread pool and thread utilization. Xxl-job avoids this problem by isolating the speed and slowness.
Q: If a slow job uses the same thread pool, the fast job cannot run in a timely manner. Normally, the response is to increase the number of threads in the thread pool. Does this solve the problem?
No, because a slow job occupies a large number of threads and occupies the thread resources of a fast job. Instead of increasing utilization, increasing the number of threads in the thread pool causes a large number of threads to become idle, and utilization decreases. The best way to do this is to have two thread pools to separate the two and use the resources of each pool wisely.
To record the number of job timeouts, the code uses a map (variable jobTimeoutCountMap) to record the number of job timeouts within one minute. The key value is job_id, and the value is the number of job timeouts. The xxljobtrigger.trigger () method is used to check whether the number of timeouts of the job_id in the map is greater than 10 before calling xxljobtrigger.trigger (). If the number of timeouts is greater than 10, the slowTriggerPool is used as follows:
// Attribute variables
private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();
// Select a thread pool and use slowTriggerPool if you schedule more than 10 times in a minute
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if(jobTimeoutCount ! =null && jobTimeoutCount.get() > 10) {
triggerPool_ = slowTriggerPool;
}
Copy the code
After calling the xxlJobtriger.trigger () method, the value of jobTimeoutCountMap is updated based on two values:
- Whether the current time is less than one minute from the last call, if not, clear the map;
- Whether the xxljobtrigger.trigger () call exceeds 500 milliseconds. If it exceeds 500 milliseconds, increase the number of timeouts for job_id in the map.
Combined with the above code, if a job is called more than 500 milliseconds 10 times within a minute, it is considered to be a frequently scheduled and time-consuming job.
The code is as follows:
// Property variable with an initial value equal to the number of minutes when JobTriggerPoolHelper was constructed
// Each time the xxlJobtrigger-.trigger () method is called, the value is equal to the number of minutes last called
private volatile long minTim = System.currentTimeMillis() / 60000;
// The number of minutes in the current time, if not within the same minute as the previous call, clear jobTimeoutCountMap
long minTim_now = System.currentTimeMillis() / 60000;
if(minTim ! = minTim_now) { minTim = minTim_now; jobTimeoutCountMap.clear(); }// The time to start calling xxljobtrigger.trigger ()
long start = System.currentTimeMillis();
/ /... Call the xxlJobtrigger-.trigger () method
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
// If it takes more than 500 ms, increase the number of its slow calls once
long cost = System.currentTimeMillis() - start;
if (cost > 500) {
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if(timeoutCount ! =null) { timeoutCount.incrementAndGet(); }}Copy the code
The xxlJobtrigger-.trigger () method is described in detail in the XxlJobTrigger class below, except that it makes an execution request for a job.
In this class, the attribute variables minTim and jobTimeoutCountMap are both volatile, ensuring consistency and visibility of data when addTrigger is concurrently invoked.
Question: Why clear the map every minute?
When the admin service sends a job scheduling request, The request is made by calling the private static JobTriggerPoolHelper helper addTrigger method in the static method public static void Trigger (). MinTim and jobTimeoutCountMap are static, but they are globally unique (because the object that holds them is globally unique). Therefore, minTim and jobTimeoutCountMap maintain the global scheduling time and timeout times of the admin service. An operation that requires data cleansing every minute.
JobRegistryHelper
Helper classes for executor registration and logout
The admin service provides an interface for an executor to register and log out, and to automatically log an executor offline if the executor has not sent a heartbeat for a long time (90 seconds). The former exposes an interface to receive requests, while the latter requires starting a thread that periodically updates the status of expired executors.
Xxl-job To improve the performance of the admin service, when the interface of the former function receives executor requests, a thread is started in the thread pool to asynchronously execute executor registration and offline requests.
The JobRegistryHelper class manages this thread pool and timed threads.
Register and log Off
The thread pool is defined and initialized as follows:
// Register or remove the executor thread pool
private ThreadPoolExecutor registryOrRemoveThreadPool = null;
// Register or remove a thread pool
registryOrRemoveThreadPool = new ThreadPoolExecutor(
2.10.30L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2000),
r -> new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode()),
(r, executor) -> {
r.run();
logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match thread pool rejected handler(run now).");
});
Copy the code
The core thread count of the thread pool is 2 and the maximum thread count is 10, allowing for a queue of 2000. If there are many executor instances, registration will be delayed. Of course, you wouldn’t normally register 2,000 executors with the same admin service.
When an Executor instance initiates a registration and logout request, it calls the corresponding method of the AdminBizImpl class, which has the following methods:
As you can see, AdminBizImpl class two methods are invoked the JobRegistryHelper way to implement, including JobRegistryHelper. The registry method code is as follows (registryRemove code similar) :
public ReturnT<String> registry(RegistryParam registryParam) {
// Check parameters
if(! StringUtils.hasText(registryParam.getRegistryGroup()) || ! StringUtils.hasText(registryParam.getRegistryKey()) || ! StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<>(ReturnT.FAIL_CODE, "Illegal Argument.");
}
// Create a thread in the thread pool
registryOrRemoveThreadPool.execute(() -> {
// update
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao()
.registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(),
registryParam.getRegistryValue(), new Date());
// Update failed, insert
if (ret < 1) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao()
.registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(),
registryParam.getRegistryValue(), new Date());
// Refresh, empty methodfreshGroupRegistryInfo(registryParam); }});return ReturnT.SUCCESS;
}
Copy the code
These two methods is to create a thread in the thread pool registryOrRemoveThreadPool asynchronous execution requests, and then the data update or new to the data in table xxl_job_registry.
Update and manage Job_group
When executor registers the admin service (data is stored in the xxL_job_registry table), it is not displayed on the page. You need to manually add the job_group data to the xxL_job_group table. The admin service automatically associates the job_group data added by the user with xxL_job_registry data. This requires admin to periodically read data from the XXL_job_group table and associate the data from the XXL_job_registry and XXL_job_group tables.
This function is implemented in the same thread as the Executor automatic logoff function. The main logic of this thread is:
- Query the group list for automatically set address from the xxL_job_group table. If the group list is not empty, proceed.
- Delete records that are no longer alive (not updated for 90 seconds) from the XXL_job_registry table to avoid invalid records affecting subsequent operations;
- Fetch the surviving records from xxL_job_registry table, set the address_list value of xxL_job_group records according to appName, and concatenate multiple addresses with commas.
- Sleep 30 seconds, this thread executes every 30 seconds.
The relevant codes are as follows:
// Register the monitor thread
private Thread registryMonitorThread;
// Stop flag bit
private volatile boolean toStop = false;
// Automatically registered job group
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
// Delete the admin/executor registry that has been offline (no heartbeat for 90 seconds)
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if(ids ! =null && ids.size() > 0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
Map
> admin/executor address map
>
,list
,list
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
HashMap<String, List<String>> appAddressMap = new HashMap<>(list.size());
/ / a little...
// Execute every 30 seconds
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
Copy the code
As can be seen from here, if the external interface (receive requests, etc.) function, use thread pool and asynchronous thread to achieve; If the tasks are automated, they are executed periodically by a thread.
JobFailMonitorHelper
A thread helper class that monitors Job execution failures
If no response is returned after a Job is scheduled, retry periodically. As an “auto-execute” task, it is obvious that you can use a thread to periodically retry, as JobRegistryHelper did earlier.
In this class, you define a monitor thread that runs every 10 seconds to retry failed jobs. If the number of job retries is greater than 0, the job tries again and sends an alarm. A thread is defined as follows:
/** * Monitor thread */
private Thread monitorThread;
Copy the code
When the admin service is deployed in a cluster (sharing a database), how can I avoid multiple instances of a job being retried? A “distributed lock” is required.
lock
In this thread, it takes advantage of the mutex feature when the database executes an UPDATE statement and uses a database-based distributed lock, as shown below:
// The UPDATE statement adds a mutex to the record, which indicates that no other thread is modifying the record and that the record has not been modified
// Set the new value -1 to indicate that the record has been locked
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao()
.updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
/ / unlock
xlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId,-1, newAlarmStatus);
Copy the code
In this statement, the state of jobLog is set to -1, which is an invalid status value. When other threads search for failed records using valid status values, the record is skipped so that it cannot be retried by other threads, achieving the function of a distributed lock (the lock is a row lock). In other words, the -1 state is similar to the lock flag bit in Java’s object header, indicating that the record has been locked and will be “ignored” by other threads.
Q: What’s wrong with the lock unlock code here?
Locking and unlocking a try block causes the record to never be unlocked if an exception is thrown when the try is retried after locking. Therefore, you should perform the unlocking in the Finnally block, or use Redis to assign an expiration time to the lock to implement distributed locks.
retry
Obtain the jobId from the failed log and query the jobInfo data. If the remaining retry times in the log are greater than 0, retry is performed. The code is as follows:
// Retrieve the failed log and the corresponding job
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
Retry the failed job and update the trigger_msg field of the log
if (log.getExecutorFailRetryCount() > 0) {
// Scheduling task scheduling
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY,
(log.getExecutorFailRetryCount() - 1),
log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = "
> > > > > > > > > > >" + I18nUtil.getString("jobconf_trigger_type_retry") + "<<<<<<<<<<< </span><br>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
/ / update log
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}
// 2. If the job is not empty, failed jobs exist and alarms are generated
// Alarm status: 0- default, -1= locked, 1- No alarm, 2- Alarm succeeded, 3- Alarm failed
int newAlarmStatus = 0;
if(info ! =null) {
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult ? 2 : 3;
} else {
newAlarmStatus = 1;
}
// 3. Update the alarm_status value of jobLog
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
Copy the code
Scheduling tasks using the mentioned earlier JobTriggerPoolHelper. The trigger method, and the last update jobLog alarm_status value, has two functions:
- Release distributed locks;
- Log log alarm_status is set to a value greater than 0 and is no longer queried as a failure log (
findFailJobLogIds
One of the query conditions of the method is alarm_status == 0), which avoids retries on the next thread execution.
JobCompleteHelper
Job The helper class for the completion thread
JobRegistryHelper has a thread pool, a thread pool, and a thread pool.
- Thread pools are used to create threads to handle incoming requests;
- Threads are used to execute certain “timed tasks”.
In effect, the thread pool and threads in this class are used to “complete” a job.
Receive the callback
When an executor receives a scheduling request from Admin, it asynchronously executes the job and immediately returns a callback.
After admin receives the callback, it creates a thread in the thread pool to handle the callback, just like in the previous “register, log out” section.
/** * The thread pool that receives the callback request */
private ThreadPoolExecutor callbackThreadPool = null;
// Initialize the thread pool
callbackThreadPool = new ThreadPoolExecutor(
2.20.30L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3000),
r -> new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode()),
(r, executor) -> {
r.run();
logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");
});
Copy the code
When there is a callback request, the public callback method (which is invoked by AdminBizImpl) creates a thread in the thread pool that iterates through the list of callback request parameters, processing the callback parameters in turn as follows:
// Create thread handling callback parameters in the thread pool
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
callbackThreadPool.execute(new Runnable() {
@Override
public void run(a) {
for (HandleCallbackParam handleCallbackParam : callbackParamList) {
ReturnT<String> callbackResult = callback(handleCallbackParam);
// ...}}});return ReturnT.SUCCESS;
}
private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
// valid log item
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId());
// Update log data.
// Update and complete the job
XxlJobCompleter.updateHandleInfoAndFinish(log);
return ReturnT.SUCCESS;
}
Copy the code
CallBack: JobApiController -> AdminBizImpl -> Public AllBack -> Private callBack
As can be seen from the code, the last call XxlJobCompleter. UpdateHandleInfoAndFinish complete callback logic method.
Update the Job
If a job is scheduled a long time ago but is in progress and its executor has no heartbeat for more than 90 seconds, the job is considered lost and needs to be terminated. This is the main function of the thread monitorThread.
Once every 60 seconds, the monitorThread searches the XXl_job_log table for jobs that were scheduled 10 minutes ago and are still in the Running state and whose executors have gone offline. Then call XxlJobCompleter. UpdateHandleInfoAndFinish to update the information and end handler job, the code is as follows:
/** * Monitor the thread that lost the job */
private Thread monitorThread;
// Task result loss Handling: If the scheduling record stays in the Running state for more than 10 minutes and the heartbeat registration failure of the corresponding actuator is not online, the local scheduling failure is actively marked as a failure
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
if(losedJobIds ! =null && losedJobIds.size() > 0) {
for (Long logId : losedJobIds) {
XxlJobLog jobLog = new XxlJobLog();
jobLog.setId(logId);
jobLog.setHandleTime(new Date());
/ / set handler_code
jobLog.setHandleCode(ReturnT.FAIL_CODE);
jobLog.setHandleMsg(I18nUtil.getString("joblog_lost_fail"));
// Update execution information and end the jobXxlJobCompleter.updateHandleInfoAndFinish(jobLog); }}Copy the code
Can be seen from the code, the above two functions. Finally calls XxlJobCompleter updateHandleInfoAndFinish method, on the introduction of the method, we can see behind XxlJobCompleter part introduction, is not detailed here.
JobLogReportHelper
Job Log statistics support class
If you look at the xxlJobtriger.triger method, you will see that jobLog records are added each time a job is scheduled, which is why the thread in JobFailMonitorHelper looks up jobLog first when it tries again.
As a record of job scheduling, JobLog can be used to count the number of job scheduling times and the number of successful jobs within a period of time. In addition, logs that exceed the validity period (logretentionDays) are cleared to avoid excessive log data. Obviously, this is another “automated task” that can be completed on a regular basis using a thread.
This class holds a thread variable that performs two operations at a frequency of one per minute:
- Collect job data during a period of time. The main statistical indicators are as follows: Total number of scheduling times, number of scheduling times, number of scheduling failures, and number of scheduling successes.
- Number of expired logs to be cleared.
In the first half of the thread run method, the thread will count the number of times of scheduling, running times, successful running times and failed times within 3 days. Then update or add xxL_JOB_LOG_report table data.
Clean up the log
In the second half of the thread run method, the thread cleans up the log every day, and if the current time is more than a day from the last time, the log record is cleaned up as follows:
// Determine whether to perform cleanup based on the last execution time and expiration parameters
// The last time it was cleared was more than 1 day ago
long lastCleanLogTime = 0;
if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays() > 0
&& System.currentTimeMillis() - lastCleanLogTime > 24 * 60 * 60 * 1000) {
// Start time of cleanup... slightly
// Start clearing logs
List<Long> logIds = null;
do {
logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(
0.0, clearBeforeTime, 0.1000);
if(logIds ! =null && logIds.size() > 0) { XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds); }}while(logIds ! =null && logIds.size() > 0);
// Update the time when the cleanup operation is performed
lastCleanLogTime = System.currentTimeMillis();
}
Copy the code
Question: Why use lastCleanLogTime to record the last cleanup time? Can’t you just clean up the data created the day before each execution?
If you do not use lastCleanLogTime to record the time of the last cleanup, you just clean up the data records created a day earlier. When the thread executes once every minute, the data of the previous year will be deleted, resulting in incomplete data of the previous year.
Using lastCleanLogTime to record the time of the last cleanup, and to ensure that the previous day’s logs are complete, the cleanup time is more than a day away from the current time.
Question: Why are only 1000 logs deleted at a time?
I do not understand why 1000 expired logs are deleted at a time instead of deleting all expired logs at one time. The trigger_time field is a normal index, so the DELETE operation is updated to the change buffer first and then merged. Query and delete now, equivalent to one more IO without using the change buffer.
JobScheduleHelper
Job scheduling assistance class
The admin service is used to manage and schedule jobs. You can also create a job in the admin background and configure CRON and JobHandler. Then the Admin service will schedule jobs according to the configured parameters. Obviously, this “automated work” is also performed periodically by threads.
1. If a Job is scheduled using threads, the first problem is as follows: If a Job is time-consuming to be scheduled, subsequent jobs may be blocked and their execution delayed. How do I solve this problem?
JobTriggerPoolHelper = JobTriggerPoolHelper = JobTriggerPoolHelper = JobTriggerPoolHelper = JobTriggerPoolHelper = JobTriggerPoolHelper = JobTriggerPoolHelper
2. The second problem is: How to ensure that the job is executed at the specified time without too much delay?
The admin uses the prefetch mode to read the jobs to be executed in the future in advance and store them to the memory in advance. In addition, the Admin uses the Time round algorithm to group jobs by time and execute the jobs to be executed in the next period.
The admin service can be deployed with multiple instances. In this case, how can I avoid a job being repeatedly scheduled by multiple instances?
Admin uses a table as a “distributed lock” to ensure that only one admin instance can perform job scheduling and to reduce contention between threads by randomly sleeping threads for a certain period of time.
Let’s look at the code to see how xxL-job solves this problem.
Thread scheduling
In this class, a scheduling thread is defined to schedule jobs to be executed and jobs that have expired for a period of time. The code is as follows:
/** * Preread milliseconds */
public static final long PRE_READ_MS = 5000;
/** * Preread and schedule expired tasks */
private Thread scheduleThread;
Copy the code
The proofs
PushTimeRing adds a job to ringData, a map, and then lets another thread fetch it from the map and schedule it again
The thread prereads some jobs whose next execution time is <= now + 5000 ms, divides them into three segments according to their next execution time, and executes three types of logic.
1, the next time the execution time in (- up, now – 5000) (5000) – up, now – (- up, now – 5000)
Note The expiration time is greater than 5000 milliseconds. If the expiration policy requires scheduling, you need to perform scheduling once. The code is as follows:
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// schedule once
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1.null.null.null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId());
}
// Update the next execution time
refreshNextValidTime(jobInfo, new Date());
}
Copy the code
2. The next execution time is within [NOW −5000,now)[NOW-5000,now)[now −5000,now)
Note If the expiration time is less than 5000 ms, it is delay but not expiration. The code is as follows:
if (nowTime > jobInfo.getTriggerNextTime()) {
// 1
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1.null.null.null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId());
// 2. Update the next scheduling time
refreshNextValidTime(jobInfo, new Date());
// 3. If the current job is in the State that can be scheduled and the next execution time is 5000 milliseconds, record the job Id and wait for polling
if (jobInfo.getTriggerStatus() == 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// Next dispatch time: second
int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);
// Save to ringData
pushTimeRing(ringSecond, jobInfo.getId());
// Refresh the next scheduling time
refreshNextValidTime(jobInfo, newDate(jobInfo.getTriggerNextTime())); }}Copy the code
If the next execution time of a job is less than 5000 ms, the job ID is recorded for later scheduling to save the I/O time of the next preread.
3, the next execution time is within [now,now+5000)[now, now+5000)
Note Before the execution time, record the job ID and wait for later scheduling. The code is as follows:
int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);
pushTimeRing(ringSecond, jobInfo.getId());
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
Copy the code
JobInfo trigger_last_TIME, TRIGger_nexT_TIME, and trigger_STATUS fields are updated after the above three steps:
// Update job data
for (XxlJobInfo jobInfo : scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
Copy the code
On the one hand, the prefetch executes the job that is to be executed within a short period of time. On the other hand, the prefetch retrits the job that is to be executed within a short period of time in the future and saves the job into a Map ringData object for scheduling by another thread. In this way, some jobs are not executed when the time is due.
A distributed lock
Because admin can be deployed in multiple instances, you need to consider how to prevent jobs from being scheduled multiple times.
Xxl-job When JobFailMonitorHelper iterates over failed jobs, it sets an invalid state for each job as distributed row lock. If the state fails to be set, the job is skipped. In this case, if you still use this method, it is possible that a job is set to invalid and the thread crashes, so that the job can never be scheduled. Therefore, do not modify the job status.
In this case, the admin service uses a table xxL_job_LOCK as a distributed lock. Each admin instance attempts to obtain the lock of this table before continuing execution. At the same time, in order to reduce contention between different instances, the thread will be on duty for a random sleep period of time.
How do I acquire distributed locks?
A transaction is started in the thread, set to manual commit, and a FOR UPDATE query is executed against the table XXL_JOB_LOCK. If the thread executes the statement successfully, threads of other instances will queue up for the lock of the table, realizing the distributed lock function. The code is as follows:
// Get the database link, try to get the X lock by SELECT FOR UPDATE
Threads of other instances will fail to acquire the lock and wait until the first transaction commit is released or the lock times out
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'schedule_lock' for update");
preparedStatement.execute();
// Commit the transaction and release the X lock
if(conn ! =null) {
try {
conn.commit();
} catch (SQLException e) {
// ...
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
// ...
}
try {
conn.close();
} catch (SQLException e) {
// ...}}Copy the code
How to reduce lock competition?
To reduce lock contention, sleep random values of 4000 ~ 5000 ms (no longer than 5000 ms, 5000 ms is the prefetch time range) before the thread starts. When the thread terminates the current loop, different sleep strategies are selected based on time and whether there is preread data:
- When the time exceeds 1000 ms, do not sleep and directly start the next cycle;
- The time is less than 1000 ms. According to whether there is preread data, sleep a random duration with different sizes:
- With preread data, the sleep time is shorter, in the range of 0 ~ 1000 ms;
- There is no preread data, and the sleep time is longer, in the range of 0 ~ 4000 ms;
The code is as follows:
try {
// Sleep 4000~5000 ms randomly to reduce lock contention in multi-instance deployment
// A maximum of 5000 instances are deployed ==.. = =
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
// If the elapsed time exceeds 1000 ms, do not sleep
// Sleep for a random duration of no more than 1000 ms
long cost = System.currentTimeMillis() - start;
if (cost < 1000) {
try {
// If there is no preread data, sleep time is longer; Have read data, sleep time is shorter
TimeUnit.MILLISECONDS.sleep((preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if(! scheduleThreadToStop) { logger.error(e.getMessage(), e); }}}Copy the code
RingThread: Time wheel
In the previous thread, the job ID is saved into a map according to the execution time (seconds), and the ringThread schedules the job by time. This is a typical “time round algorithm”. The code is as follows:
/** * schedule thread 2 */
private Thread ringThread;
/** * Scheduling job */ by time (second)
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
// Scheduling process
List<Integer> ringItemData = new ArrayList<>();
// Take out two jobs at each time for scheduling
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove((nowSecond + 60 - i) % 60);
if(tmpData ! =null) { ringItemData.addAll(tmpData); }}// Traverses the job Id to perform the scheduling
if (ringItemData.size() > 0) {
for (int jobId : ringItemData) {
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1.null.null.null);
}
ringItemData.clear();
}
Copy the code
In each round robin, only the jobs at the current time (seconds) and within the last second are displayed. The jobs that are long ago from the current time are not scheduled.
Before the polling schedule is executed, there is a sleep in the range of 0 to 1000 milliseconds. Without this sleep, the thread would have been executing, and the current moment (second) in ringData would have been empty, resulting in a large number of invalid operations; By adding this sleep, you can avoid this invalid operation. The reason why the sleep time is less than 1000 ms is that the minimum accuracy of the scheduling time is second. A one-second sleep can avoid the delay of the job.
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
Copy the code
Question: Why is distributed lock not added during time round scheduling?
Mysql > update job next_trigger_time (now + 5000 ms); mysql > update job next_trigger_time (now + 5000 ms); Next_trigger_time < now + 5000, no distributed lock is required.
Xxljobscheduler-init = xxlJobScheduler-destroy = xxlJobScheduler-destroy
XxlJobScheduler-destroy
The destroy method is simple and simply destroys the previously initialized thread pool and threads in the reverse order in which they were started.
The code is as follows:
/** * destroy, the process is the reverse of init */
public void destroy(a) throws Exception {
// 1
JobScheduleHelper.getInstance().toStop();
// 2, destroy log statistics and clean up threads
JobLogReportHelper.getInstance().toStop();
// 3. Destroy the lost job monitor thread
JobCompleteHelper.getInstance().toStop();
// 4. Destroy the failed job monitor thread
JobFailMonitorHelper.getInstance().toStop();
// admin registry stop
JobRegistryHelper.getInstance().toStop();
// admin trigger pool stop
JobTriggerPoolHelper.toStop();
}
Copy the code
Because the toStop methods are similar, we will only cover JobScheduleHelper’s toStop method.
The steps of this method are as follows:
1. Set the stop flag bit to true.
2. Sleep for a while, freeing up CPU time slices for the thread to execute the task;
3. If the thread is not terminated (the thread is sleeping), interrupt it.
4. The thread executes the JOIN method until the thread terminates and executes for the last time.
The code is as follows:
scheduleThreadToStop = true;
// Give the thread 1s to execute the task
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
// If the thread is not terminated, let it complete all tasks
if(scheduleThread.getState() ! = Thread.State.TERMINATED) { scheduleThread.interrupt();try {
scheduleThread.join();
} catch(InterruptedException e) { logger.error(e.getMessage(), e); }}Copy the code
The admin service starts multiple thread pools and threads to asynchronously execute tasks and asynchronously respond to executor requests.
Next, we introduce the XxlJobTrigger and XxlJobCompleter mentioned earlier.
XxlJobTrigger
Encapsulation class for scheduling jobs
XxlJobTrigger is an encapsulation class for job scheduling. Its main job is to accept the incoming jobId and scheduling parameters, query the corresponding jobGroup and jobInfo, and invoke the ExecutorBiz object to execute the job scheduling (run method).
Note: This class itself does not perform HTTP requests; HTTP requests are performed in the utility class XxlJobRemotingUtil within the Core package.
Trigger -> processTrigger -> runExecutor,
trigger
The function of this method is relatively simple. It is to query jobGroup and jobInfo objects according to the parameters passed in, set the related field values, and then call the processTrigger method.
processTrigger
The main work of this method is divided into the following steps:
1. Save a scheduling log.
Select * from jobInfo, jobGroup, TriggerParam;
3. Obtain the executor address to be scheduled from jobGroup based on the routing policy of jobInfo.
4. Call the runExecutor method to perform scheduling;
5. Save scheduling parameters, set scheduling information, and update logs.
The fields of jobInfo and jobGroup are not modified. Only the fields are retrieved and used. The modification of these two fields is carried out in the previous trigger method.
runExecutor
This method performs a schedule and returns the result of the schedule. Its core code is as follows:
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
runResult = executorBiz.run(triggerParam);
Copy the code
Use the XxlJobScheduler class to pull out the ExecutorBiz object, and create an ExecutorBiz object for each address in a “lazy load” manner, as shown below:
private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>();
public static ExecutorBiz getExecutorBiz(String address) throws Exception {
// valid
if (address == null || address.trim().length() == 0) {
return null;
}
// load-cache
address = address.trim();
ExecutorBiz executorBiz = executorBizRepository.get(address);
if(executorBiz ! =null) {
return executorBiz;
}
// set-cache
executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());
executorBizRepository.put(address, executorBiz);
return executorBiz;
}
Copy the code
This function can be placed in the XxlJobTrigger class, or packaged within ExecutorBiz.
You can see that the three methods in this class can be classified as: pre -> execute -> POST, which does some pre – and post-execution work before, during, and after execution.
XxlJobCompleter
The completion class of Job
This class is used in JobCompleteHelper, where the final job is completed, and has two main methods:
- UpdateHandleInfoAndFinish: public methods, call finishJob method and update log;
- FinishJob: private method that executes subtasks and updates the log;
The finishJob method is described below.
finishJob
The main functions of finishJob are as follows: If the current job is successfully executed, all subtasks of finishJob are scheduled. Finally, the scheduling messages of subtasks are added to the log of the current job. The code is as follows:
private static void finishJob(XxlJobLog xxlJobLog) {
Handle success, to trigger child job
StringBuilder triggerChildMsg = null;
if (XxlJobContext.HANDLE_CODE_SUCCESS == xxlJobLog.getHandleCode()) {
XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId());
if(xxlJobInfo ! =null&& xxlJobInfo.getChildJobId() ! =null && xxlJobInfo.getChildJobId().trim().length() > 0) {
// 2, iterate over subtask ID
String[] childJobIds = xxlJobInfo.getChildJobId().split(",");
for (int i = 0; i < childJobIds.length; i++) {
intchildJobId = (childJobIds[i] ! =null && childJobIds[i].trim().length() > 0 && isNumeric(childJobIds[i])) ? Integer.parseInt(childJobIds[i]) : -1;
if (childJobId > 0) {
// 3
JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1.null.null.null);
ReturnT<String> triggerChildResult = ReturnT.SUCCESS;
4. Add log information.... slightly}}}// 5. Save subtask scheduling messages to logs
if(triggerChildMsg ! =null) { xxlJobLog.setHandleMsg(xxlJobLog.getHandleMsg() + triggerChildMsg); }}Copy the code
Note that:
JobTriggerPoolHelper is relied on to schedule the job, so there is a 50-second wait for JobTriggerPoolHelper to complete when the monitor thread for JobTriggerHelper starts.
FinishJob: finishJob: finishJob: finishJob: finishJob: finishJob: finishJob: finishJob: finishJob: finishJob: finishJob: finishJob: finishJob: finishJob: finishJob: finishJob
conclusion
1, XxlJobAdminConfig as admin service startup entry, to keep as simple as possible, similar to a warehouse, to manage and hold all the classes and objects, will not start a specific thread, it just need to “press the button of the initiator”;
2. XxlJobScheduler is the initiator class of the admin service. It calls various helper classes (xxxHelper) to start the corresponding threads.
3. External interfaces, such as job scheduling, receiving, registration, and offline, are implemented asynchronously by thread pool + threads to avoid job blocking on the main thread.
4. The functions of the “automatic task” class are executed regularly by threads;
Refer to the reading
Xxl-job distributed scheduling framework
Time wheel algorithm
An open source time wheel algorithm introduction