Some time ago, the company wanted to transform the existing single-node scheduling into distributed task scheduling, and then studied the current mainstream open source distributed task scheduling framework on the market, and used it as a feeling: trouble! Especially before writing many scheduling tasks in a class, it is more troublesome to transform. I am a lazy person, and I always feel that I have to change a lot of tools written by others, which makes me feel a little uncomfortable. So I just want to write a framework, after all, feel all distributed task scheduling in distributed system is one of the most simple, because the general corporate task scheduling itself could not have scheduling of tasks at the same time, a lot of concurrent, transformed into a distributed mainly in order to spread tasks to multiple nodes, in order to handle more tasks at the same time. Later one day, I took the express at the front desk of the company, and saw such a phenomenon: several of our colleagues (including me) looked at the front desk from beginning to end to see whether the express was their own, and took it if it was their own, or ignored it if it was not, and then I received inspiration. This scenario is analogous to the distributed dispatching system. We can think that the express company or Courier has sorted each express according to our name and phone number, and we only need to take our own. But from another point of view, it can also be understood that each of us has watched all the packages from beginning to end, and then according to some agreed rules, if it is his own package, he will take it, and if it is not his own package, he will ignore and move on to the next one. If the express is thought as a task, a bunch of people to get a bunch of express can also get their own express smoothly, then a bunch of nodes to get the task can also deal with their own task?
Traditional distributed task scheduling has a scheduling center, which also deplores a cluster called multi-node to avoid a single point of failure, and then has a bunch of actuators, which execute the tasks assigned by the scheduling center. According to the above inspiration, my idea is to give up the centralized scheduling center and directly go to the common place by each actuator node to fetch tasks according to the agreed rules, and then execute them. The design schematic diagram is as follows
Some people may suspect that the task DB library does not have a single point of problem, I would like to ask, don’t other distributed task scheduling frameworks have this problem? High availability solutions can be considered separately for database single points in the same way as business libraries, which is not the focus of this article. Obviously we are focusing on the execution nodes and how to ensure high availability, a single task will not be executed by multiple nodes at the same time, a single node will suddenly lose contact in the middle of execution, what to do about the task and so on. Follow-up the way we use without modify the code to solve this problem (without modifying structure optimization of main is no laundry list of code style, basically is a lot of people including me to see others always feel dizzy, source as if place oneself a labyrinth, seems particularly difficult, it may just be my own state not to!)
Since centralized scheduling is omitted, it is obvious that there must be a scheduling process since it is called task scheduling. Otherwise, how can multiple nodes compete for a task to avoid conflicts? My solution here is: First of all, make clear several states of a task: to be executed, in execution, exception, completed. Each node starts a thread to check the pending tasks that will be executed soon, and then iterate over these tasks. The version number (version number +1) and status (becoming executing) of the task are updated by using optimistic lock. If the update succeeds, the task will be put into the delay queue of the node for execution. Since the threads of each node go to the database to check the pending tasks, it is obvious that the tasks that become in progress will not be queried by other nodes next time. As for those pending tasks that are checked before the status of the node is updated, they will also fail to update after optimistic locking attempts and thus skip this task. This prevents a task from being executed by multiple nodes at the same time. The key codes are as follows:
package com.rdpaas.task.scheduler; import com.rdpaas.task.common.*; import com.rdpaas.task.config.EasyJobConfig; import com.rdpaas.task.repository.NodeRepository; import com.rdpaas.task.repository.TaskRepository; import com.rdpaas.task.strategy.Strategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Date; import java.util.List; import java.util.concurrent.*; /** * @author rongdi * @date 2019-03-13 21:15 */ @Component Public class TaskExecutor {private static final Logger logger = LoggerFactory.getLogger(TaskExecutor.class); @Autowired private TaskRepository taskRepository; @Autowired private NodeRepository nodeRepository; @Autowired private EasyJobConfig config; Private DelayQueue<DelayItem<Task>> taskQueue = new DelayQueue<>(); / * * * can clearly know the most will only run two threads, directly using the system built-in tools can * / private ExecutorService bossPool = Executors. NewFixedThreadPool (2); /** * Private ThreadPoolExecutor workerPool; @construct public void init() {/** * queueSize, corePoolSize, queueSize, In addition, the number of threads will be automatically expanded to maxSize when the waiting queue is full, and will be automatically reclaimed after 60 seconds when the newly expanded thread is idle. * The thread pool has been customized by the Executors. */ workerPool = new ThreadPoolExecutor(config.getCorePoolSize(), config.getMaxPoolSize(), 60, timeUnit.seconds, new ArrayBlockingQueue<>(config.getQueueSize())); /** * Execute pending task load thread */ bosspool.execute (new Loader()); /** * Execute task scheduling thread */ bosspool.execute (new Boss()); } class Loader implements Runnable { @Override public void run() { for(;;) {try {/ * * * search and specifies the time (in seconds) the main Task of the List * / List < Task > tasks. = taskRepository listPeddingTasks (config. GetFetchDuration ()); if(tasks == null || tasks.isEmpty()) { continue; } for(Task task:tasks) { task.setStatus(TaskStatus.DOING); task.setNodeId(config.getNodeId()); /** * Use optimistic locks to try to update the status. If the update succeeds, other nodes will not update successfully. If a node has updated the task between the query and the current time, Version and inevitably found out when the version is not the same, here update * will return 0 * / int n = taskRepository. UpdateWithVersion (task); Date nextStartTime = task.getNextStartTime(); if(n == 0 || nextStartTime == null) { continue; */ task = taskRepository.get(task.getid ()); DelayItem<Task> delayItem = new DelayItem<Task>(nextStartTime.getTime() - new Date().getTime(), task); taskQueue.offer(delayItem); } Thread.sleep(config.getFetchPeriod()); } catch(Exception e) { logger.error("fetch task list failed,cause by:{}", e); } } } } class Boss implements Runnable { @Override public void run() { for (;;) */ DelayItem<Task> item = taskqueue.take (); */ DelayItem<Task> item = taskQueue. if(item ! = null && item.getItem() ! = null) { Task task = item.getItem(); workerPool.execute(new Worker(task)); } } catch (Exception e) { logger.error("fetch task failed,cause by:{}", e); } } } } class Worker implements Runnable { private Task task; public Worker(Task task) { this.task = task; } @Override public void run() { logger.info("Begin to execute task:{}",task.getId()); TaskDetail detail = null; Try {// detail = taskRepository.start(task); if(detail == null) return; // Invoke task task.getInvokor().invoke(); Finish (task,detail); logger.info("finished execute task:{}",task.getId()); } catch (Exception e) { logger.error("execute task:{} error,cause by:{}",task.getId(), e); try { taskRepository.fail(task,detail,e.getCause().getMessage()); } catch(Exception e1) { logger.error("fail task:{} error,cause by:{}",task.getId(), e); }}}} /** * Complete the sub-task, if the parent task fails, Subtasks do not execute * @param task * @param detail * @throws Exception */ private void finish(Task Task,TaskDetail) throws List<Task> childTasks = taskRepository.getChilds(task.getid ())); If (childTasks = = null | | childTasks. IsEmpty ()) {/ / when there is no subtasks taskRepository parent task. The finish (task, detail); return; } else {for (Task childTask: childTask) {// TaskDetail childDetail = null; Try {// Change the state of the subtask to the executing childtask.setStatus (taskStatus.doing); childTask.setNodeId(config.getNodeId()); . / / the subtasks childDetail = taskRepository startChild (childTask, detail); / / use optimistic locking status update, otherwise may cause the concurrent and restoring the thread problem here int n = taskRepository. UpdateWithVersion (childTask); ChildTask = taskRepository.get(childtask.getid ())); Childtask.getinvokor ().invoke(); // Finish (childTask, childDetail); } } catch (Exception e) { logger.error("execute child task error,cause by:{}", e); try { taskRepository.fail(childTask, childDetail, e.getCause().getMessage()); } catch (Exception e1) { logger.error("fail child task error,cause by:{}", e); }}} /** * Complete the parent task */ taskRepository. Finish (task,detail); }}}Copy the code
As mentioned above, it is guaranteed that a task can be scheduled by only one node at a time. At this time, if multiple nodes are deployed, the tasks in the task library should be smoothly executed, just like a bunch of people to pick up the express at the front desk. They can take away all the express smoothly. After all, for each express is not their own or other people’s, their own express will not be other people’s. However, the dispatching here is a little different from picking up the express. Everyone picking up the express knows how to distinguish which express is his or her own. There is no such concept in scheduling. It’s all about the node that gets lucky and updates the task state with an optimistic lock. Generally speaking, the difference is the need for an agreed rule, the express is not their own, directly look at the express name and mobile phone number to know. We can also create a rule that specifies which tasks and which nodes can be executed to avoid unnecessary lock contention. Those strategies for load balancing can be borrowed here, and for now I want to implement the following rules:
1) ID_hash: Mod the number of nodes based on the task’s self-added ID. The mod value matches the real-time serial number of the current node. If it can match, you can take it away and execute it
2) least_count: The node that performs the least number of tasks takes the task first
3) weight: fetch tasks according to node weight
4) Default: first come, first served, no other rules
According to the above rules can be said to be the task of load balancing strategy can also know that in addition to the default rules, need to know the rest of the rules of node information, such as the node number of executions, node number, node weights, etc., so we need to add a node heartbeat, every one heartbeat cycle to your own information to the database, the heart core code is as follows:
Private DelayQueue<DelayItem<Node>> heartBeatQueue = new DelayQueue<>(); / / private DelayQueue<DelayItem<Node>> heartBeatQueue = new DelayQueue<>(); / * * * can clearly know the most will only run two threads, direct use of tool system at * / private ExecutorService bossPool = Executors. NewFixedThreadPool (2); @construct public void init() {/** * if the thread restore switch is on, */ if(config.isrecoverEnable () && config.isheartbeatEnable ()) {/** * Initializes a node to the heartbeat queue with a delay of 0, Offer (new DelayItem<>(0,new Node(config.getNodeId()))); /** * Execute the HeartBeat thread */ bosspool.execute (new HeartBeat()); Bosspool.execute (new Recover()); bosspool.execute (new Recover()); @override public void run() {for(;); {try {/** * when the time is up, you can take out the node object from the delay queue, and then update the time and serial number, * finally create a new node object with the timeout time of the heartbeat time into the delay queue. */ DelayItem<Node> item = heartbeatqueue.take (); if(item ! = null && item.getItem() ! = null) { Node node = item.getItem(); handHeartBeat(node); } heartBeatQueue.offer(new DelayItem<>(config.getHeartBeatSeconds() * 1000,new Node(config.getNodeId()))); } catch (Exception e) { logger.error("task heart beat error,cause by:{} ",e); }}}} /** * @param node */ private void handHeartBeat(node node) {if(node == null) {return; } /** * first check whether the node exists in the database * if not: first check whether the node exists, then set to the node object, finally insert * if there is: Directly according to the nodeId update serial number and time of the current Node * / Node currNode = nodeRepository. GetByNodeId (Node. GetNodeId ()); if(currNode == null) { node.setRownum(nodeRepository.getNextRownum()); nodeRepository.insert(node); } else { nodeRepository.updateHeartBeat(node.getNodeId()); }}Copy the code
Database with node information, we can achieve a variety of fancy task strategy, the code is as follows:
@author rongdi * @date 2019-03-16 12:36 */ public interface Strategy {/** * DEFAULT policy */ String DEFAULT = "default"; /** * Match task ID hash with node ID */ String ID_HASH = "ID_HASH "; /** * Minimum number of executions */ String LEAST_COUNT = "LEAST_COUNT "; /** */ String WEIGHT = "WEIGHT "; public static Strategy choose(String key) { switch(key) { case ID_HASH: return new IdHashStrategy(); case LEAST_COUNT: return new LeastCountStrategy(); case WEIGHT: return new WeightStrategy(); default: return new DefaultStrategy(); } } public boolean accept(List<Node> nodes,Task task,Long myNodeId); }Copy the code
/** * Mod the number of valid nodes according to the task ID hash method, and then the remainder +1 matches the sequence number of each node. * This method is actually equivalent to polling. * @author rongdi * @date 2019-03-16 */ public class IdHashStrategy implements Strategy {/** * The set of nodes here must not be empty. */ @override public Boolean accept(List<Node> Nodes, Task Task, Long myNodeId) { int size = nodes.size(); long taskId = task.getId(); */ Node myNode = nodes.stream().filter(Node -> node.getNodeId() == myNodeId).findFirst().get(); return myNode == null ? false : (taskId % size) + 1 == myNode.getRownum(); }}Copy the code
/** * The minimum number of tasks to be processed, i.e., each time the task comes, see if you are the least number of tasks to be processed, @author rongdi * @date 2019-03-16 21:56 */ public class leastStrategy implements Strategy {@override public boolean accept(List<Node> nodes, Task task, Long myNodeId) {/** * return true */ Optional<Node> min = nodes.stream().min((o1, o2) -> o1.getCounts().compareTo(o2.getCounts())); return min.isPresent()? min.get().getNodeId() == myNodeId : false; }}Copy the code
/** * According to the weight allocation strategy, the scheme is as follows: If * node number 1, 2, 3, 4 * node weights 2, 3, 3, 2 * after taking more than 0, 1 and 4 | | 2 5 and 7 | 8 or 9 * number 1 can consume according to weight and take over after less than 2 * Serial no. 2 can consume * that is greater than or equal to 2 and less than 2+3 * that is greater than or equal to 2+3 and less than 2+3+3 * that is greater than or equal to 2+3+3 and less than 2+3+ 2 This node can be consumed according to the sum of the weights, after the sum of the weight is greater than or equal to the weight of the previous node and less than including their own weight sum of this range * dO not know if there is a big god have better algorithm idea * @author rongdi * @date 2019-03-16 23:16 */ public class WeightStrategy implements Strategy { @Override public boolean accept(List<Node> nodes, Task task, Long myNodeId) { Node myNode = nodes.stream().filter(node -> node.getNodeId() == myNodeId).findFirst().get(); if(myNode == null) { return false; */ int preWeightSum = nodes.stream().filter(node -> node.getrownum () < myNode.getRownum()).collect(Collectors.summingInt(Node::getWeight)); */ int weightSum = nodes.stream().collect(node.summingint (Node::getWeight)); Int remainder = (int)(task.getid () % weightSum); /** remainder = (int)(task.getid () % weightSum); return remainder >= preWeightSum && remainder < preWeightSum + myNode.getWeight(); }}Copy the code
And then we’ll modify the scheduling class
/** * private Strategy Strategy; @postconstruct public void init() {/** * select a node based on the configuration */ strategy = strategy.choose (config.getNodeStrategy()); /** * user-defined thread pool, initial number of threads corePoolSize, queueSize of thread pool queueSize, when the initial threads have tasks, and when the queue is full * the number of threads will be automatically expanded to the maximum number of threads maxSize, when the newly expanded thread idle 60 seconds later. * The thread pool has been customized by the Executors. */ workerPool = new ThreadPoolExecutor(config.getCorePoolSize(), config.getMaxPoolSize(), 60, timeUnit.seconds, new ArrayBlockingQueue<>(config.getQueueSize())); /** * Execute pending task load thread */ bosspool.execute (new Loader()); /** * Execute task scheduling thread */ bosspool.execute (new Boss()); } class Loader implements Runnable { @Override public void run() { for(;;) {the try {/ * * * to obtain a List of available Node * / List < Node > nodes. = nodeRepository getEnableNodes (config. GetHeartBeatSeconds () * 2); if(nodes == null || nodes.isEmpty()) { continue; } / * * * search and specifies the time (in seconds) the main Task of the List * / List < Task > tasks. = taskRepository listPeddingTasks (config. GetFetchDuration ()); if(tasks == null || tasks.isEmpty()) { continue; } for(Task task:tasks) { boolean accept = strategy.accept(nodes, task, config.getNodeId()); /** * if(! accept) { continue; } task.setStatus(TaskStatus.DOING); task.setNodeId(config.getNodeId()); /** * Use optimistic locks to try to update the status. If the update succeeds, other nodes will not update successfully. If a node has updated the task between the query and the current time, Version and inevitably found out when the version is not the same, here update * will return 0 * / int n = taskRepository. UpdateWithVersion (task); Date nextStartTime = task.getNextStartTime(); if(n == 0 || nextStartTime == null) { continue; */ task = taskRepository.get(task.getid ()); DelayItem<Task> delayItem = new DelayItem<Task>(nextStartTime.getTime() - new Date().getTime(), task); taskQueue.offer(delayItem); } Thread.sleep(config.getFetchPeriod()); } catch(Exception e) { logger.error("fetch task list failed,cause by:{}", e); }}}}Copy the code
As mentioned above, various fancy load strategies can be used to balance the tasks acquired by each node and significantly reduce the competition among each node for the same task. However, there is another problem. If a node gets a task update and becomes executing, and no exception occurs in the middle of execution, the node suddenly hangs for various reasons, then the task will never be executed again. This is the legendary dog in the manger. The solution to this problem is to restore the thread with an exception that is common in ultimately consistent systems. In this scenario, you only need to check the pending tasks of nodes whose heartbeat timeout period is specified (for example, three heartbeat periods by default), restore the status of these tasks to pending, and change the next execution time to the current one. The core code is as follows:
class Recover implements Runnable { @Override public void run() { for (;;) {try {/** * Searches for the task to be recovered. The task to be recovered is defined as the task is not completed and the heartbeat time of the executing node has not been updated for more than three * heartbeat cycles. Since these tasks are suspended before the execution node finishes, * only needs to change the state back to pending execution and change the next execution time to the current time. Let the Task to be scheduled again a * / List < Task > tasks. = taskRepository listRecoverTasks (config. GetHeartBeatSeconds () * 3); if(tasks == null || tasks.isEmpty()) { return; } / * * * to obtain a List of available Node * / List < Node > nodes. = nodeRepository getEnableNodes (config. GetHeartBeatSeconds () * 2); if(nodes == null || nodes.isEmpty()) { return; } long maxNodeId = nodes.get(nodes.size() - 1).getNodeId(); for (Task task : Tasks) {/** * Each node has a recovery thread, From available nodes in order to avoid unnecessary competition, find a closest to the task nodes belonging to * / long currNodeId = chooseNodeId (nodes, maxNodeId, task getNodeId ()); long myNodeId = config.getNodeId(); /** * if(currNodeId! = myNodeId) { continue; } /** * change the task status to PENDING and the node to the current node */ task.setStatus(taskstatus.pending); task.setNextStartTime(new Date()); task.setNodeId(config.getNodeId()); taskRepository.updateWithVersion(task); } Thread.sleep(config.getRecoverSeconds() * 1000); } catch (Exception e) { logger.error("Get next task failed,cause by:{}", e); }}}} /** * select the next node * @param nodes * @param maxNodeId * @param nodeId * @return */ private long chooseNodeId(List<Node> nodes,long maxNodeId,long nodeId) { if(nodes.size() == 0 || nodeId >= maxNodeId) { return nodes.get(0).getNodeId(); } return nodes.stream().filter(node -> node.getNodeId() > nodeId).findFirst().get().getNodeId(); }Copy the code
In order to avoid unnecessary competition between the exception recovery threads of each node for the same task, each exception task can only be recovered by the next normal node of the node ID of the task. This ensures that even if the node hangs before the task is completed, it will automatically recover after a period of time. All in all, the above should be a good framework for task scheduling without considering optimization. I’m sorry if you thought that was the end of it, and haha! I’m used to scheduling tasks with @scheduled annotations in a class, but I’m not used to scheduling tasks with @scheduled annotations in a class. I’m not used to scheduling tasks with @scheduled annotations in a class. I’m not used to scheduling tasks with @scheduled annotations in a class.
/** * Component public class SchedulerTest {@scheduled (Scheduled) */ "0/10 * * * *?" ) public void test1() throws InterruptedException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Thread.sleep(2000); System.out.println(" format(new Date()) +sdf.format(new Date())); } @Scheduled(cron = "0/20 * * * * ?" ,parent = "test1") public void test2() throws InterruptedException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Thread.sleep(2000); Println (" format(new Date()) +sdf.format(new Date())); } @Scheduled(cron = "0/10 * * * * ?" ,parent = "test2") public void test3() throws InterruptedException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Thread.sleep(2000); Println (" format(new Date()) +sdf.format(new Date())); } @Scheduled(cron = "0/10 * * * * ?" ,parent = "test3") public void test4() throws InterruptedException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Thread.sleep(2000); Println (" format(new Date()) +sdf.format(new Date())); }}Copy the code
To accomplish this, we also need to load custom annotations (with the same name as Spring’s) after Spring starts, as shown below
/** * After the spring container is started, @author rongdi * @date 2019-03-15 21:07 */ @Component Public class ContextRefreshedListener implements ApplicationListener<ContextRefreshedEvent> { @Autowired private TaskExecutor taskExecutor; */ private Map<String,Long> taskIdMap = new HashMap<>(); / / private Map<String,Long> taskIdMap = new HashMap<>(); @override public void onApplicationEvent(ContextRefreshedEvent event) { Prevent call twice the load (MVC will trigger a) * / if (event. GetApplicationContext (). The getParent () = = null) {/ * * * determine whether scheduling switch open * if opened: Load dispatch notes and add scheduling to the scheduling management of * / ApplicationContext context = event. GetApplicationContext (); Map<String,Object> beans = context.getBeansWithAnnotation(org.springframework.scheduling.annotation.EnableScheduling.class); if(beans == null) { return; */ String <String,Method> methodMap = new HashMap<>(); /** * Find all classes that are directly or indirectly decorated with Component annotations, since services, controllers, etc., contain components. / Map<String,Object> allBeans =. / Map<String,Object> allBeans = context.getBeansWithAnnotation(org.springframework.stereotype.Component.class); Set<Map.Entry<String,Object>> entrys = allBeans.entrySet(); */ for(map.entry Entry :entrys){Object obj = entry.getValue(); Class clazz = obj.getClass(); Method[] methods = clazz.getMethods(); for(Method m:methods) { if(m.isAnnotationPresent(Scheduled.class)) { methodMap.put(clazz.getName() + Delimiters.DOT + m.getName(),m); }}} /** * handle comments */ handleSheduledAnn(methodMap); */ taskidmap.clear (); */ taskidmap.clear (); /** * private void handleSheduledAnn(map <String,Method> methodMap) {/** * private void handleSheduledAnn(map <String,Method> methodMap) { if(methodMap == null || methodMap.isEmpty()) { return; } Set<Map.Entry<String,Method>> entrys = methodMap.entrySet(); /** * Go through the bean and the method in it to find the methods that have been Scheduled annotations, and then place the task in the schedule */ for(map.entry <String, method > Entry :entrys){method m = entry.getValue(); try { handleSheduledAnn(methodMap,m); } catch (Exception e) { e.printStackTrace(); continue; }}} /** * Add parent and child tasks recursively * @param methodMap * @param m * @throws Exception */ private void handleSheduledAnn(Map<String,Method> methodMap,Method m) throws Exception { Class<? > clazz = m.getDeclaringClass(); String name = m.getName(); Scheduled sAnn = m.getAnnotation(Scheduled.class); String cron = sAnn.cron(); String parent = sAnn.parent(); /** * If parent is empty, this method represents the root task, which is added to the task scheduler and saved in the global map. The child task needs to know the id of the parent task. * Obtain the parent task ID from taskIdMap based on the full name of the method represented by the parent task. If the parent task ID is found, add the child task */ if(stringutils.isempty (parent)) {if(! taskIdMap.containsKey(clazz.getName() + Delimiters.DOT + name)) { Long taskId = taskExecutor.addTask(name, cron, new Invocation(clazz, name, new Class[]{}, new Object[]{})); taskIdMap.put(clazz.getName() + Delimiters.DOT + name, taskId); } } else { String parentMethodName = parent.lastIndexOf(Delimiters.DOT) == -1 ? clazz.getName() + Delimiters.DOT + parent : parent; Long parentTaskId = taskIdMap.get(parentMethodName); if(parentTaskId == null) { Method parentMethod = methodMap.get(parentMethodName); handleSheduledAnn(methodMap,parentMethod); */ parentTaskId = taskidmap. get(parentMethodName); } if(parentTaskId ! = null && ! taskIdMap.containsKey(clazz.getName() + Delimiters.DOT + name)) { Long taskId = taskExecutor.addChildTask(parentTaskId, name, cron, new Invocation(clazz, name, new Class[]{}, new Object[]{})); taskIdMap.put(clazz.getName() + Delimiters.DOT + name, taskId); }}}}Copy the code
The above code to complete the spring initialization completed after loading their own custom task scheduling annotations, and also by spring scheduling switch @enablesCheduling control, to achieve seamless integration into spring or SpringBoot to meet the requirements of lazy people LIKE me.
Well actually write this framework about 5 days to get through the spare time, estimated that there will be some hidden pit, the pit of but obviously I settled, open source out is not only to the topic, the purpose of to the prick silk programmers also provide a new train of thought, hope to be of service, at the same time also hope you help to find more bugs, Let’s perfect this thing. Gods, please ignore it. Writing is not good, mainly because I haven’t written a composition for a long time, please excuse me a lot. Detailed ledger source code with lengthy Chinese annotations to view :github.com/rongdi/easy…