I. Function description

SpringBoot Scheduled task enhancement tool to implement dynamic management of SpringBoot native Scheduled tasks. Fully compatible with native @scheduled annotations, you do not need to modify the original Scheduled tasks

Two, quick use

Specific functions have been packaged as Springboot-starter plug and play

<dependency>
    <groupId>com.github.guoyixing</groupId>
    <artifactId>spring-boot-starter-super-scheduled</artifactId>
    <version>0.3.1</version>
</dependency>
Copy the code

Use method and source code:

Yards cloud: gitee.com/qiaodaimade…

Github:github.com/guoyixing/s…

Three, the implementation principle

1. Dynamic management

(1) Configuration management introduction

@Component("superScheduledConfig")
public class SuperScheduledConfig {
    /** * The thread pool that executes scheduled tasks */
    private ThreadPoolTaskScheduler taskScheduler;

    /** * The container associated with the scheduled task name and the scheduled task callback hook */
    private Map<String, ScheduledFuture> nameToScheduledFuture = new ConcurrentHashMap<>();

    /** * The relationship between the scheduled task name and the logic to be executed by the scheduled task container */
    private Map<String, Runnable> nameToRunnable = new ConcurrentHashMap<>();

    /** * Indicates the association between the scheduled task name and the source information of the scheduled task
    private Map<String, ScheduledSource> nameToScheduledSource = new ConcurrentHashMap<>();
	/* Common get/sets omit */
}
Copy the code

(2) Use the post-processor to intercept SpringBoot’s original scheduled task

1, Implement the ApplicationContextAware interface to get the SpringBoot context. 2, implement the BeanPostProcessor interface to mark this class as a post-processor. After each bean is instantiated, the post-processor implements a @dependson annotation to enforce a SuperScheduledConfig class. Let SpringBoot instantiation SuperScheduledPostProcessor class instantiation SuperScheduledConfig before class 4, main implementation logic in postProcessAfterInitialization () method

@DependsOn({"superScheduledConfig"})
@Component
@Order
public class SuperScheduledPostProcessor implements BeanPostProcessor.ApplicationContextAware {
    protected final Log logger = LogFactory.getLog(getClass());

    private ApplicationContext applicationContext;

    /** * The operation before instantiating the bean *@paramBean Bean instance *@paramBeanName beanName */
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    /** * The operation after instantiating the bean *@paramBean Bean instance *@paramBeanName beanName */
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        1. Obtain the configuration manager
        SuperScheduledConfig superScheduledConfig = applicationContext.getBean(SuperScheduledConfig.class);

        //2. Get all the methods of the currently instantiated bean
        Method[] methods = bean.getClass().getDeclaredMethods();
        // Process each method one by one
        if (methods.length > 0) {
            for (Method method : methods) {
            	//3. Try to get @scheduled annotations on this method (SpringBoot timed task annotations)
                Scheduled annotation = method.getAnnotation(Scheduled.class);
                // Skip this method if you can't get the @scheduled annotation
                if (annotation == null) {
                    continue;
                }
                //4. Create a source attribute for the scheduled task
                // Create the source attribute of the scheduled task (used to record the configuration of the scheduled task, initialized with the original attribute in the annotation)
                ScheduledSource scheduledSource = new ScheduledSource(annotation, method, bean);
                // Check the attributes in the source attributes obtained on the annotation
                if(! scheduledSource.check()) {throw new SuperScheduledException("In" + beanName + "Bean" + method.getName() + "Method annotation parameter error");
                }
                // To generate the name (ID) of the scheduled task, run beanName+ ". + method name
                String name = beanName + "." + method.getName();
                // The source data will be saved to the configuration manager in the form of key-value. Key: indicates the name of the scheduled task. Value: indicates the source data
                superScheduledConfig.addScheduledSource(name, scheduledSource);
                try {
                	//5. Cancel the scheduled SpringBoot task
                    clearOriginalScheduled(annotation);
                } catch (Exception e) {
                    throw new SuperScheduledException("Closing the original method" + beanName + method.getName() + "Time error"); }}}// Finally, the bean returns unchanged
        return bean;
    }

    /** * Modifies the original attribute of the annotation *@paramAnnotation Instance object *@throws Exception
     */
    private void clearOriginalScheduled(Scheduled annotation) throws Exception {
        changeAnnotationValue(annotation, "cron", Scheduled.CRON_DISABLED);
        changeAnnotationValue(annotation, "fixedDelay", -1L);
        changeAnnotationValue(annotation, "fixedDelayString"."");
        changeAnnotationValue(annotation, "fixedRate", -1L);
        changeAnnotationValue(annotation, "fixedRateString"."");
        changeAnnotationValue(annotation, "initialDelay", -1L);
        changeAnnotationValue(annotation, "initialDelayString"."");
    }


    /** * Get SpringBoot context *@paramApplicationContext SpringBoot context */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext; }}Copy the code

(3) Use ApplicationRunner to initialize the custom scheduled task runner

DependsOn threadPoolTaskScheduler class DependsOn threadPoolTaskScheduler class DependsOn threadPoolTaskScheduler class DependsOn After all beans are initialized, run the custom logic. 4. The main implementation logic is in the run() method

@DependsOn("threadPoolTaskScheduler")
@Component
public class SuperScheduledApplicationRunner implements ApplicationRunner.ApplicationContextAware {
    protected final Log logger = LogFactory.getLog(getClass());
    private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    private ApplicationContext applicationContext;
	
	/** * Scheduled task configuration manager */
    @Autowired
    private SuperScheduledConfig superScheduledConfig;
    /** * Scheduled task execution thread */
    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;

    @Override
    public void run(ApplicationArguments args) {
    	//1. The scheduled task execution thread is cached in the scheduled task configuration manager
        superScheduledConfig.setTaskScheduler(threadPoolTaskScheduler);
        //2. Obtain the source data of all scheduled tasks
        Map<String, ScheduledSource> nameToScheduledSource = superScheduledConfig.getNameToScheduledSource();
        // Process scheduled tasks one by one
        for (String name : nameToScheduledSource.keySet()) {
            //3. Obtain the scheduled task source data
            ScheduledSource scheduledSource = nameToScheduledSource.get(name);
            //4. Get all the enhanced classes
            String[] baseStrengthenBeanNames = applicationContext.getBeanNamesForType(BaseStrengthen.class);
            //5. Create an execution controller
            SuperScheduledRunnable runnable = new SuperScheduledRunnable();
            // Configure the execution controller
            runnable.setMethod(scheduledSource.getMethod());
            runnable.setBean(scheduledSource.getBean());
            //6. Processing enhancement classes one by one (detailed analysis of the implementation principle of the enhancer)
            List<Point> points = new ArrayList<>(baseStrengthenBeanNames.length);
            for (String baseStrengthenBeanName : baseStrengthenBeanNames) {
            	//7. Proxy the enhancer as point
                Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName);
                // Create the proxy
                Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable));
                proxy.setSuperScheduledName(name);
                //8. Connect all points
                points.add(proxy);
            }
			// Form the point into a call chain
            runnable.setChain(new Chain(points));
            // Encapsulate the execution logic and cache it in the scheduled task configuration manager
            superScheduledConfig.addRunnable(name, runnable::invoke);
            try {
            	//8. Start the scheduled taskScheduledFuture<? > schedule = ScheduledFutureFactory.create(threadPoolTaskScheduler , scheduledSource, runnable::invoke);// Store the thread callback hook in the task configuration manager
                superScheduledConfig.addScheduledFuture(name, schedule);
                logger.info(df.format(LocalDateTime.now()) + "Task" + name + "Already started...");

            } catch (Exception e) {
                throw new SuperScheduledException("Task" + name + "Failed to start, error message:"+ e.getLocalizedMessage()); }}}@Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext; }}Copy the code

(4) Dynamic management

@Component
public class SuperScheduledManager {
    protected final Log logger = LogFactory.getLog(getClass());
    private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private SuperScheduledConfig superScheduledConfig;

    /** * Changes the Scheduled execution cycle **@paramName Scheduled name *@paramCron Cron expression */
    public void setScheduledCron(String name, String cron) {
        // Terminate the original task
        cancelScheduled(name);
        // Create new tasks
        ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
        scheduledSource.clear();
        scheduledSource.setCron(cron);
        addScheduled(name, scheduledSource);
    }

    /** ** Changes the Scheduled fixedDelay **@paramName Scheduled name *@paramFixedDelay How long it takes to execute */ after the last execution point
    public void setScheduledFixedDelay(String name, Long fixedDelay) {
        // Terminate the original task
        cancelScheduled(name);
        // Create new tasks
        ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
        scheduledSource.clear();
        scheduledSource.setFixedDelay(fixedDelay);
        addScheduled(name, scheduledSource);
    }

    /** ** Changes the Scheduled fixedRate **@paramName Scheduled name *@paramHow long has it been since fixedRate last started executing */
    public void setScheduledFixedRate(String name, Long fixedRate) {
        // Terminate the original task
        cancelScheduled(name);
        // Create new tasks
        ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
        scheduledSource.clear();
        scheduledSource.setFixedRate(fixedRate);
        addScheduled(name, scheduledSource);
    }

    /** * All Scheduled applications have been started
    public List<String> getRunScheduledName(a) {
        Set<String> names = superScheduledConfig.getNameToScheduledFuture().keySet();
        return new ArrayList<>(names);
    }

    /** * All Scheduled */
    public List<String> getAllSuperScheduledName(a) {
        Set<String> names = superScheduledConfig.getNameToRunnable().keySet();
        return new ArrayList<>(names);
    }

    /** ** Scheduled **@paramName Scheduled name */
    public void cancelScheduled(String name) {
        ScheduledFuture scheduledFuture = superScheduledConfig.getScheduledFuture(name);
        scheduledFuture.cancel(true);
        superScheduledConfig.removeScheduledFuture(name);
        logger.info(df.format(LocalDateTime.now()) + "Task" + name + "Has been terminated...");
    }

    /** ** Scheduled **@paramName Scheduled name *@paramScheduledSource Indicates the source information of the scheduled task */
    public void addScheduled(String name, ScheduledSource scheduledSource) {
        if (getRunScheduledName().contains(name)) {
            throw new SuperScheduledException("Scheduled task" + name + "It's already been activated.");
        }
        if(! scheduledSource.check()) {throw new SuperScheduledException("Scheduled task" + name + "Error in source data content"); } scheduledSource.refreshType(); Runnable runnable = superScheduledConfig.getRunnable(name); ThreadPoolTaskScheduler taskScheduler = superScheduledConfig.getTaskScheduler(); ScheduledFuture<? > schedule = ScheduledFutureFactory.create(taskScheduler, scheduledSource, runnable); logger.info(df.format(LocalDateTime.now()) +"Task" + name + "Already started...");

        superScheduledConfig.addScheduledSource(name, scheduledSource);
        superScheduledConfig.addScheduledFuture(name, schedule);
    }

    /** ** Scheduled starts with a cron type **@paramName Scheduled name *@paramCron Cron expression */
    public void addCronScheduled(String name, String cron) {
        ScheduledSource scheduledSource = new ScheduledSource();
        scheduledSource.setCron(cron);

        addScheduled(name, scheduledSource);
    }

    /** * Start Scheduled with fixedDelay type **@paramName Scheduled name *@paramFixedDelay How long it takes to execute * after the last execution point@paramInitialDelay Delay of the first execution */
    public void addFixedDelayScheduled(String name, Long fixedDelay, Long... initialDelay) {
        ScheduledSource scheduledSource = new ScheduledSource();
        scheduledSource.setFixedDelay(fixedDelay);
        if(initialDelay ! =null && initialDelay.length == 1) {
            scheduledSource.setInitialDelay(initialDelay[0]);
        } else if(initialDelay ! =null && initialDelay.length > 1) {
            throw new SuperScheduledException("Only one argument can be passed in for the delay time of the first execution.");
        }

        addScheduled(name, scheduledSource);
    }

    /** * Start Scheduled with fixedRate **@paramName Scheduled name *@paramHow long has it been since fixedRate last started executing *@paramInitialDelay Delay of the first execution */
    public void addFixedRateScheduled(String name, Long fixedRate, Long... initialDelay) {
        ScheduledSource scheduledSource = new ScheduledSource();
        scheduledSource.setFixedRate(fixedRate);
        if(initialDelay ! =null && initialDelay.length == 1) {
            scheduledSource.setInitialDelay(initialDelay[0]);
        } else if(initialDelay ! =null && initialDelay.length > 1) {
            throw new SuperScheduledException("Only one argument can be passed in for the delay time of the first execution.");
        }

        addScheduled(name, scheduledSource);
    }

    /** * Manually execute a task **@paramName Scheduled name */
    public void runScheduled(String name) { Runnable runnable = superScheduledConfig.getRunnable(name); runnable.run(); }}Copy the code

2. Enhance interface implementation

The overall idea of the enhancer implementation is the same as SpringAop, and the implementation is less complex than Aop

(1) Enhance the interface

@Order(Ordered.HIGHEST_PRECEDENCE)
public interface BaseStrengthen {
    /** ** Preconditioning method **@paramBean Bean instance (or proxied bean) *@paramMethod Executes the method object *@paramArgs method argument */
    void before(Object bean, Method method, Object[] args);

    /** * The afterFinally method does not execute after an exception occurs@paramBean Bean instance (or proxied bean) *@paramMethod Executes the method object *@paramArgs method argument */
    void after(Object bean, Method method, Object[] args);

    /** * Exception enhancement method **@paramBean Bean instance (or proxied bean) *@paramMethod Executes the method object *@paramArgs method argument */
    void exception(Object bean, Method method, Object[] args);

    /** * Finally strengthens the method to execute ** when exceptions occur@paramBean Bean instance (or proxied bean) *@paramMethod Executes the method object *@paramArgs method argument */
    void afterFinally(Object bean, Method method, Object[] args);
}
Copy the code

(2) Proxy abstract classes

public abstract class Point {
    /** * Specifies the scheduled task name */
    private String superScheduledName;

    /** * abstract execution method, using proxy implementation *@paramRunnable Scheduled task executor */
    public abstract Object invoke(SuperScheduledRunnable runnable);
    
    /* Common get/sets omit */
}
Copy the code

(3) Call the chain class

public class Chain {
    private List<Point> list;
    private int index = -1;
    /** * The index increases by 1 */
    public int incIndex(a) {
        return ++index;
    }

    /** * index restore */
    public void resetIndex(a) {
        this.index = -1; }}Copy the code

(4) Cglib dynamic proxy implementation

Use the Cglib proxy enhancer to proxy all the enhancers as call chain node points

public class RunnableBaseInterceptor implements MethodInterceptor {
    /** * Scheduled task executor */
    private SuperScheduledRunnable runnable;
    /** * Scheduled task enhancement class */
    private BaseStrengthen strengthen;

    @Override
    public Object intercept(Object obj, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
        Object result;
        // If the invoke() method is invoked
        if ("invoke".equals(method.getName())) {
        	// Prereinforcement method
            strengthen.before(obj, method, args);
            try {
            	// Invoke the invoke() method in the executor
                result = runnable.invoke();
            } catch (Exception e) {
            	// Exception reinforcement method
                strengthen.exception(obj, method, args);
                throw new SuperScheduledException(strengthen.getClass() + "Error occurred during reinforcement execution", e);
            } finally {
            	//Finally strengthens the method to execute exceptions
                strengthen.afterFinally(obj, method, args);
            }
            // Post reinforcement method
            strengthen.after(obj, method, args);

        } else {
        	// Execute the method directly
            result = methodProxy.invokeSuper(obj, args);
        }
        return result;
    }

    public RunnableBaseInterceptor(Object object, SuperScheduledRunnable runnable) {
        this.runnable = runnable;
        if (BaseStrengthen.class.isAssignableFrom(object.getClass())) {
            this.strengthen = (BaseStrengthen) object;
        } else {
            throw new SuperScheduledException(object.getClass() + "Object not of type BaseStrengthen"); }}public RunnableBaseInterceptor(a) {}}Copy the code

(5) Implementation of scheduled task executor

public class SuperScheduledRunnable {
    /** * the original method */
    private Method method;
    /** * the bean in which the method is located */
    private Object bean;
    /** * intensifier call chain */
    private Chain chain;


    public Object invoke(a) {
        Object result;
        // Index is incremented by 1
        if (chain.incIndex() == chain.getList().size()) {
            // The enhanced methods in the call chain have all been executed
            try {
                // Call chain initialization
                chain.resetIndex();
                // When the enhancer is complete, execute the original method
                result = method.invoke(bean);
            } catch (IllegalAccessException | InvocationTargetException e) {
                throw newSuperScheduledException(e.getLocalizedMessage()); }}else {
            // Get the proxied method enhancer
            Point point = chain.getList().get(chain.getIndex());
            // Execute the enhancer agent
            // In the enhancer agent, the method executor is called back to form a call chain that runs the enhancers in the call chain one by one
            result = point.invoke(this);
        }
        return result;
    }
    
    /* Common get/sets omit */
}
Copy the code

(6) Enhancer agent logic

Com. Gyx. Superscheduled. Core. The code snippet in SuperScheduledApplicationRunner class

// Create an execution controller
SuperScheduledRunnable runnable = new SuperScheduledRunnable();
runnable.setMethod(scheduledSource.getMethod());
runnable.setBean(scheduledSource.getBean());
// The proxy object used to store the enhancer
List<Point> points = new ArrayList<>(baseStrengthenBeanNames.length);
// Loop through all enhancers beanName
for (String baseStrengthenBeanName : baseStrengthenBeanNames) {
	// Get the enhancer's bean object
    Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName);
    // Proxy the enhancer as a Point node
    Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable));
    proxy.setSuperScheduledName(name);
    // The enhancer's proxy object is cached in the list
    points.add(proxy);
}
// Generate the call chain for the collection of enhancer proxy instances
// Set the call chain in the execution controller
runnable.setChain(new Chain(points));
Copy the code