0 x00 the
Previously in Celery failover in parallel distributed frame Celery fault tolerance mechanism, mentioned Quartz failover strategy, we will incidentally see how Quartz implementation.
You can check each other out, see the similarities and differences between these systems, see the best of them.
0x01 Basic Concepts
1.1 distributed
When you think about distribution, you can roughly think about it in two ways: functional and storage.
- In terms of functionality, is it centralized or distributed? In the case of distributed management, how to ensure interaction and coordination among nodes?
- In terms of storage, is it centralized storage or distributed storage? If it is distributed storage, how can you ensure that all of them add up to provide a complete storage image?
For Quartz, the functional side is distributed management and the storage side is centralized storage.
1.1.1 Functions
Each node in a Quartz cluster is an independent Quartz application. Each node is independent and does not interact with each other. In theory, it is completely independent.
But in order to deal with clustering, this kind of complete independence actually means no independence at all, where each node needs to perform all administrative functions and each node needs to manage the other nodes. So it’s all for one, one for all.
In other words, absolute freedom means absolute non-freedom, seemingly independent nodes, but each of the other nodes can manage you.
1.1.2 Storage
Quartz takes a centralized approach, putting all information in a database table that provides the external logic.
Storage also assists in management. A standalone Quartz node does not communicate with another node or management node, but is aware of another Quartz application through the same database table. I don’t manage you directly, but all the other nodes can secretly control you through the database.
1.2 Basic Concepts
To understand some basic concepts of the Quartz framework:
-
The core elements of Quartz task scheduling are: Scheduler, Trigger and Job. Trigger and Job are metadata of task scheduling, and Scheduler is the controller that actually performs the scheduling.
-
Trigger is an element used to define scheduling times, which are the rules by which tasks are executed.
-
Job Indicates the scheduled task.
-
Quartz calls triggering a job fire;
-
When Quartz is running, there are several types of threads. One is the scheduling thread (single thread) used to schedule the job, and the other is the work pool used to execute the job’s specific business.
-
Several of Quartz’s tables are directly related to job triggering:
- Triggers the table. The Triggers table records the PREVFIRETIME, NEXT_FIRETIME, and TRIGGERSTATE of a trigger.
- The locks table. Quartz supports distribution, meaning that multiple threads can simultaneously seize the same resource, and Quartz relies on this table to handle this situation;
- Fired_triggers table. Record the Triggers information that is being triggered;
-
TRIGGER_STATE, the state of the trigger;
1.3 Scheduling Threads
There are two main threads Scheduler schedules: the thread that performs regular scheduling and the thread that performs misFiredtrigger.
-
The regular scheduling thread polls all the stored triggers. If there is a trigger that needs to be triggered, that is, the time for the next trigger has reached, it obtains an idle thread from the task execution thread pool to execute the task associated with this trigger.
-
The Misfire thread scans all the triggers to see if there are any misFiredtriggers, and if there are any, treats them separately according to Misfire’s policy (fire now OR wait for the next fire).
0x02 Failover
Quartz implements High availability and scalability for tasks through failover and task load balancing in cluster mode.
Quartz ensures high availability based on the existence of schedule records in the schedule record table.
-
Essentially, each node in the cluster works by sharing the same database (Quartz manages the cluster by starting two maintenance threads to maintain the database state, one to check the node state and one to restore the task).
-
Multiple nodes in a Quartz cluster do not work at the same time. Only one node is active and the other nodes are standby. One of the other nodes is automatically upgraded to a working node only when the working node fails.
-
Failover occurs when a node fails to perform one or more tasks. When a node fails, other nodes detect and flag ongoing tasks in the database on the failed node.
-
Any task marked recoverable (the “Requests Recovery” attribute of the task details) will be re-executed by the other nodes. Tasks that are not marked recoverable are only released and will be executed the next time the relevant trigger fires.
So here’s what we want to think about:
- How to find the faulty node;
- How to transfer failed tasks;
0x03 General idea
The fail-over mechanism works in a cluster environment. The thread class that performs recovery work is called ClusterManager, which is also started when the scheduler is initialized.
Check in (LAST_CHECKIN_TIME); check in (LAST_CHECKIN_TIME); check in (LAST_CHECKIN_TIME); And look for other scheduler instances where this field has stopped updating.
A Fail Over occurs when one of the nodes fails during one or more jobs. When a node fails, other nodes detect the condition and identify the jobs in the database that are going on inside the failed node.
If it is found that the check in time of a scheduler is about 15s earlier than the current time (depending on the specific execution preconfiguration), it will determine that the scheduler instance needs recover, and then the recovery mechanism of the scheduler will be started to obtain the trigger being triggered by the target scheduler instance. For each trigger, a corresponding simpleTrigger is temporarily added to execute only once.
When the scheduling process scans triggers, these triggers will be triggered. In this way, the incomplete scheduling is successfully incorporated into the ordinary scheduling process in the form of a special trigger. As long as the scheduling process is running normally, these recovered triggers will be triggered and executed soon.
0x04 How Do I Discover a Faulty Node
Most of the failed nodes are detected by regular heartbeat.
In general, there are two types, the push-pull model.
-
Push: Heartbeat periodically. Each node sends heartbeat to the management node.
-
Pull: The management node periodically pulls status information from each node.
Because Quartz has no management nodes, it must be pushed to simulate the heartbeat.
4.1 Database Tables
Table qrtz_scheduler_state Node instance information in the storage cluster. Quartz periodically reads this table to determine the current status of each instance in the cluster.
- Instance_name: the configuration file before org. Quartz. The scheduler. The instanceId configuration name, will be written to this field, if set to AUTO, quartz can according to the physical machine name and the current time to produce a name;
- Last_checkin_time: indicates the last check time.
- Checkin_interval: check interval.
The specific table is as follows:
create table qrtz_scheduler_state
(
sched_name varchar(120) not null,
instance_name varchar(200) not null,
last_checkin_time longint not null,
checkin_interval longint not null,
primary key (sched_name,instance_name)
);
Copy the code
4.2 Cluster Management Threads
The ClusterManager thread is created when the scheduling instance StdSchedulerFactory starts scheduling start() and is also a separate thread instance.
- The cluster management thread checks for faulty nodes if it is checking for the first time, and if it finds a faulty node, it handles it.
- After that, the cluster management thread sleeps until the next detection cycle (configuration file)
org.quartz.jobStore.clusterCheckinInterval
, the default value is 15000 (15 seconds), checks the CHECKIN database, traverses the instance status of sibling nodes in the cluster, and checks the health of sibling nodes in the cluster. - If a failed node exists, the trigger state of the failed node is updated and the failed node instance state is deleted. This sharing of trigger task data between cluster nodes enables failover and signals to the scheduling thread. The task scheduling of the failed node is handled by the scheduling processing thread.
The code for the reduced version is as follows, and you can see that it will do doCheckin periodically:
class ClusterManager extends Thread {
private volatile boolean shutdown = false;
private int numFails = 0;
private boolean manage(a) {
boolean res = false;
try {
res = doCheckin(); / / the checkin
numFails = 0;
} catch (Exception e) {
if(numFails % 4= =0) {
getLog().error(
"ClusterManager: Error managing cluster: "
+ e.getMessage(), e);
}
numFails++;
}
return res;
}
@Override
public void run(a) {
while(! shutdown) {if(! shutdown) {long timeToSleep = getClusterCheckinInterval();
long transpiredTime = (System.currentTimeMillis() - lastCheckin);
timeToSleep = timeToSleep - transpiredTime;
if (timeToSleep <= 0) {
timeToSleep = 100L;
}
if(numFails > 0) {
timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
}
Thread.sleep(timeToSleep);
}
if(! shutdown &&this.manage()) { // The periodic timer function
signalSchedulingChangeImmediately(0L); }}//while ! shutdown}}Copy the code
The logic is as follows:
+-----------------------------+ +---------------------------------+
| Node A | | Node B |
| | | |
| | | |
| ^ +--> ClusterManager +--> | | ^----> ClusterManager +----> |
| | | | Checkin +----+ Checkin | | | |
| | +---------> | DB | <----------+ | |
| | | | +----+ | | | |
| <-------------------------v | | <----------------------------v |
| timer | | timer |
| | | |
+-----------------------------+ +---------------------------------+
Copy the code
2 the Checkin regularly
This method is:
- If it is not the first Checkin, clusterCheckIn is called to find the faulty node.
- Otherwise, after obtaining the lock, call findFailedInstances again to obtain failedRecords (because the situation will change after obtaining the lock, we need to find the failed node again).
- If failedRecords is greater than 0, clusterRecover is attempted.
The code is as follows:
protected boolean doCheckin(a) throws JobPersistenceException {
boolean transOwner = false;
boolean transStateOwner = false;
boolean recovered = false;
Connection conn = getNonManagedTXConnection();
try {
// Other than the first time, always checkin first to make sure there is
// work to be done before we acquire the lock (since that is expensive,
// and is almost never necessary). This must be done in a separate
// transaction to prevent a deadlock under recovery conditions.
List<SchedulerStateRecord> failedRecords = null;
if(! firstCheckIn) {// If it is not the first Checkin
failedRecords = clusterCheckIn(conn);
commitConnection(conn);
}
if (firstCheckIn || (failedRecords.size() > 0)) {
getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS);
transStateOwner = true;
// Now that we own the lock, make sure we still have work to do.
// The first time through, we also need to make sure we update/create our state record
// Otherwise, after obtaining the lock, call findFailedInstances again to obtain failedRecords (after obtaining the lock, the situation will change, so we need to find the failed node again)
failedRecords = (firstCheckIn) ? clusterCheckIn(conn) : findFailedInstances(conn);
if (failedRecords.size() > 0) {
getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
//getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
transOwner = true;
clusterRecover(conn, failedRecords); // 尝试进行clusterRecover
recovered = true;
}
}
commitConnection(conn);
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
}
firstCheckIn = false;
return recovered;
}
Copy the code
4.2.2 Failed node detection
When a Scheduler instance of a node in the cluster performs CHECKIN, it checks to see if there are other Scheduler instances of nodes that have not CHECKIN by the time they are expected to arrive. If one or more nodes have not CHECKIN by the scheduled time, The running Scheduler then assumes that it or they failed. You then need to acquire the instance state access row lock to update the trigger state, delete the failed node instance state, and so on.
The method of finding a faulty node in a sibling node of the cluster is as follows
org.quartz.impl.jdbcjobstore.JobStoreSupport.findFailedInstances(Connection)
Copy the code
The judgment of node failure is related to the last CHECKIN time of node Scheduler instance, and the judgment condition is as follows:
LAST_CHECKIN_TIME + Max(check period, time since last CHECKIN of check node) + 7500ms < currentTime.Copy the code
Logic is:
By examining the SCHEDULER_STATE table in a Scheduler records LAST_CHEDK_TIME column value is earlier than org. Quartz. JobStore. ClusterCheckinInterval to determine: :
- Read all records in the QrTZ_scheduler_STATE table;
- To traverse a record, for a record:
- If it is its own node and it is the first CheckIn, put it into the error node list.
- If other nodes and the last CHECKIN time of the node Scheduler instance is more than 7500ms from the current time, the faulty node is added to the list.
- Because of this interval, it means that the time difference from the last checkin time to the current checkin time is greater than this interval, which means that the node corresponding to this column does not checkin on time, and the node fails.
The specific code is:
/** * Get a list of all scheduler instances in the cluster that may have failed. * This includes this scheduler if it is checking in for the first time. */
protected List<SchedulerStateRecord> findFailedInstances(Connection conn)
throws JobPersistenceException {
try {
List<SchedulerStateRecord> failedInstances = new LinkedList<SchedulerStateRecord>();
boolean foundThisScheduler = false;
long timeNow = System.currentTimeMillis();
// Read records from the database
List<SchedulerStateRecord> states = getDelegate().selectSchedulerStateRecords(conn, null);
for(SchedulerStateRecord rec: states) {
// find own record...
if (rec.getSchedulerInstanceId().equals(getInstanceId())) {
foundThisScheduler = true;
if(firstCheckIn) { failedInstances.add(rec); }}else {
// find failed instances...
// See if it is out of date
if(calcFailedIfAfter(rec) < timeNow) { failedInstances.add(rec); }}}// The first time through, also check for orphaned fired triggers.
if (firstCheckIn) {
failedInstances.addAll(findOrphanedFailedInstances(conn, states));
}
// If not the first time but we didn't find our own instance, then
// Someone must have done recovery for us.
if((! foundThisScheduler) && (! firstCheckIn)) {// FUTURE_TODO: revisit when handle self-failed-out impl'ed (see FUTURE_TODO in clusterCheckIn() below)
}
returnfailedInstances; }}Copy the code
Calculation time is:
protected long calcFailedIfAfter(SchedulerStateRecord rec) {
return rec.getCheckinTimestamp() +
Math.max(rec.getCheckinInterval(),
(System.currentTimeMillis() - lastCheckin)) +
7500L;
}
Copy the code
SelectSchedulerStateRecords is read from the database record:
public List<SchedulerStateRecord> selectSchedulerStateRecords(Connection conn, String theInstanceId)
throws SQLException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
List<SchedulerStateRecord> lst = new LinkedList<SchedulerStateRecord>();
if(theInstanceId ! =null) {
ps = conn.prepareStatement(rtp(SELECT_SCHEDULER_STATE));
ps.setString(1, theInstanceId);
} else {
ps = conn.prepareStatement(rtp(SELECT_SCHEDULER_STATES));
}
rs = ps.executeQuery();
while (rs.next()) {
SchedulerStateRecord rec = new SchedulerStateRecord();
rec.setSchedulerInstanceId(rs.getString(COL_INSTANCE_NAME));
rec.setCheckinTimestamp(rs.getLong(COL_LAST_CHECKIN_TIME));
rec.setCheckinInterval(rs.getLong(COL_CHECKIN_INTERVAL));
lst.add(rec);
}
return lst;
} finally{ closeResultSet(rs); closeStatement(ps); }}Copy the code
The specific logic is as follows:
+--------------------------------+ +-----------------------------------------------------------+ | Node A | | DB | | | | qrtz_scheduler_state | | | | | | ^ +--> ClusterManager +--v | | +----------------------------------------------------+ | | | | | selectSchedulerStateRecords | | | | | | +-------------------------------> | | | | | | | | | | Node A, LAST_CHECKIN_TIME, CHECKIN_INTERVAL | | | | | | | | | | | | | | calcFailedIfAfter | | Node B, LAST_CHECKIN_TIME, CHECKIN_INTERVAL | | | | | <----------------------------+ | | | | | <----------------------- v | | | ...... | | | timer | | | | | | | | | Node Z, LAST_CHECKIN_TIME, CHECKIN_INTERVAL | | +--------------------------------+ | | | | | +----------------------------------------------------+ | | | +-----------------------------------------------------------+Copy the code
The mobile phone is as follows:
0x05 Failed Task Is Transferred
Let’s talk about recovering jobs from failed instances.
When a Sheduler instance fails to execute a Job, it is possible for another working Scheduler instance to take over the Job and run it again.
5.1 Requesting Recovery
To implement this behavior, the RequestsRecovery property of the Job configured to the JobDetail object must be set to true (job.setrequestsRecovery (true)).
- If the recoverable property is set to false, when a Scheduler fails to run the job, it will not run again. Instead, another instance of the Scheduler is simply released to execute when the next relevant Triggers Triggers.
- Any job marked restore true will be re-executed by the remaining nodes, thus achieving the purpose of failed task transfer.
5.2 Updating the Trigger Status
Thread failure is detected nodes of cluster management, will update the trigger condition, org. Quartz. Impl. Jdbcjobstore. Constants constant class defines several state to trigger.
The rules for updating the status of faulty nodes are as follows:
Status of the failed node before trigger update | Updated Status |
---|---|
BLOCKED | WAITING |
PAUSED_BLOCKED | PAUSED |
ACQUIRED | WAITING |
COMPLETE | If no, delete Trigger |
The cluster management thread deletes the instance state of the failed node in the database (qrTZ_scheduler_state table), that is, resets all tasks triggered by the failed node. The original failed tasks are handled by the scheduler processing thread as well as normal tasks.
5.3 Recovery Task
Task recovery is performed using the clusterRecover method.
- Traverse each failed node, and for each node:
- Retrieves the tasks that this node already has, iterating through each task
- For Blocked Triggers, release, modifying its state;
- Release Acquired Triggers, modify its state;
- If the task needs to be resumed, it is processed by adding a new trigger:
- Set job information.
- Sets its next run time
- Insert to database;
- If the task cannot be executed concurrently, change its status accordingly.
- Retrieves the tasks that this node already has, iterating through each task
The specific code is as follows:
@SuppressWarnings("ConstantConditions")
protected void clusterRecover(Connection conn, List<SchedulerStateRecord> failedInstances) {
if (failedInstances.size() > 0) {
long recoverIds = System.currentTimeMillis();
try {
// Iterate over each failed node
for (SchedulerStateRecord rec : failedInstances) {
List<FiredTriggerRecord> firedTriggerRecs = getDelegate()
.selectInstancesFiredTriggerRecords(conn,
rec.getSchedulerInstanceId());
Set<TriggerKey> triggerKeys = new HashSet<TriggerKey>();
// For tasks already obtained by the failed node, each task is iterated
for (FiredTriggerRecord ftRec : firedTriggerRecs) {
TriggerKey tKey = ftRec.getTriggerKey();
JobKey jKey = ftRec.getJobKey();
triggerKeys.add(tKey);
// For blocked Triggers, release and change its state
// release blocked triggers..
if (ftRec.getFireInstanceState().equals(STATE_BLOCKED)) {
getDelegate()
.updateTriggerStatesForJobFromOtherState(
conn, jKey,
STATE_WAITING, STATE_BLOCKED);
} else if (ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)) {
getDelegate()
.updateTriggerStatesForJobFromOtherState(
conn, jKey,
STATE_PAUSED, STATE_PAUSED_BLOCKED);
}
// Release Acquired Triggers, modify its state
// release acquired triggers..
if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) {
getDelegate().updateTriggerStateFromOtherState(
conn, tKey, STATE_WAITING,
STATE_ACQUIRED);
acquiredCount++;
} else if (ftRec.isJobRequestsRecovery()) {
// If the task needs to be restored, it is processed
// handle jobs marked for recovery that were not fully
// executed..
if (jobExists(conn, jKey)) {
@SuppressWarnings("deprecation")
SimpleTriggerImpl rcvryTrig = new SimpleTriggerImpl(
"recover_"
+ rec.getSchedulerInstanceId()
+ "_"
+ String.valueOf(recoverIds++),
Scheduler.DEFAULT_RECOVERY_GROUP,
new Date(ftRec.getScheduleTimestamp()));
rcvryTrig.setJobName(jKey.getName());
rcvryTrig.setJobGroup(jKey.getGroup());
rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY);
rcvryTrig.setPriority(ftRec.getPriority());
JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp()));
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_SCHEDULED_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getScheduleTimestamp()));
rcvryTrig.setJobDataMap(jd);
rcvryTrig.computeFirstFireTime(null);
storeTrigger(conn, rcvryTrig, null.false,
STATE_WAITING, false.true);
recoveredCount++;
} else{ otherCount++; }}else {
otherCount++;
}
// free up stateful job's triggers. }... }}}Copy the code
The code for calculating the next triggering time of the recovery node is as follows:
/**
* <p>
* Called by the scheduler at the time a <code>Trigger</code> is first
* added to the scheduler, in order to have the <code>Trigger</code>
* compute its first fire time, based on any associated calendar.
* </p>
*
* <p>
* After this method has been called, <code>getNextFireTime()</code>
* should return a valid answer.
* </p>
*
* @return the first time at which the <code>Trigger</code> will be fired
* by the scheduler, which is also the same value <code>getNextFireTime()</code>
* will return (until after the first firing of the <code>Trigger</code>).
* </p>
*/
@Override
public Date computeFirstFireTime(Calendar calendar) {
nextFireTime = getStartTime();
while(nextFireTime ! =null&& calendar ! =null
&& !calendar.isTimeIncluded(nextFireTime.getTime())) {
nextFireTime = getFireTimeAfter(nextFireTime);
if(nextFireTime == null)
break;
//avoid infinite loop
java.util.Calendar c = java.util.Calendar.getInstance();
c.setTime(nextFireTime);
if (c.get(java.util.Calendar.YEAR) > YEAR_TO_GIVEUP_SCHEDULING_AT) {
return null; }}return nextFireTime;
}
Copy the code
This concludes quartz’s failover analysis.
0xEE Personal information
★★★★ Thoughts on life and technology ★★★★★
Wechat official account: Rosie’s Thoughts
If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.
0 XFF reference
Quartz high availability of clusters
A glimpse of the leopard’s other characteristics
Quartz peep missions are stateful and concurrent
Quartz gives a glimpse of trigger priorities
Quartz gives a glimpse of missed trigger handling strategies
Quartz gives a glimpse of the trigger state
Quartz has a peek at thread handling
Quartz gives a glimpse of cluster management
—- Trigger start on time principle
Quartz thread pool parsing