I. Function description
SpringBoot Scheduled task enhancement tool to implement dynamic management of SpringBoot native Scheduled tasks. Fully compatible with native @scheduled annotations, you do not need to modify the original Scheduled tasks
Two, quick use
Specific functions have been packaged as Springboot-starter plug and play
<dependency>
<groupId>com.github.guoyixing</groupId>
<artifactId>spring-boot-starter-super-scheduled</artifactId>
<version>0.3.1</version>
</dependency>
Copy the code
Use method and source code:
Yards cloud: gitee.com/qiaodaimade…
Github:github.com/guoyixing/s…
Three, the implementation principle
1. Dynamic management
(1) Configuration management introduction
@Component("superScheduledConfig")
public class SuperScheduledConfig {
/** * The thread pool that executes scheduled tasks */
private ThreadPoolTaskScheduler taskScheduler;
/** * The container associated with the scheduled task name and the scheduled task callback hook */
private Map<String, ScheduledFuture> nameToScheduledFuture = new ConcurrentHashMap<>();
/** * The relationship between the scheduled task name and the logic to be executed by the scheduled task container */
private Map<String, Runnable> nameToRunnable = new ConcurrentHashMap<>();
/** * Indicates the association between the scheduled task name and the source information of the scheduled task
private Map<String, ScheduledSource> nameToScheduledSource = new ConcurrentHashMap<>();
/* Common get/sets omit */
}
Copy the code
(2) Use the post-processor to intercept SpringBoot’s original scheduled task
1, Implement the ApplicationContextAware interface to get the SpringBoot context. 2, implement the BeanPostProcessor interface to mark this class as a post-processor. After each bean is instantiated, the post-processor implements a @dependson annotation to enforce a SuperScheduledConfig class. Let SpringBoot instantiation SuperScheduledPostProcessor class instantiation SuperScheduledConfig before class 4, main implementation logic in postProcessAfterInitialization () method
@DependsOn({"superScheduledConfig"})
@Component
@Order
public class SuperScheduledPostProcessor implements BeanPostProcessor.ApplicationContextAware {
protected final Log logger = LogFactory.getLog(getClass());
private ApplicationContext applicationContext;
/** * The operation before instantiating the bean *@paramBean Bean instance *@paramBeanName beanName */
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
/** * The operation after instantiating the bean *@paramBean Bean instance *@paramBeanName beanName */
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
1. Obtain the configuration manager
SuperScheduledConfig superScheduledConfig = applicationContext.getBean(SuperScheduledConfig.class);
//2. Get all the methods of the currently instantiated bean
Method[] methods = bean.getClass().getDeclaredMethods();
// Process each method one by one
if (methods.length > 0) {
for (Method method : methods) {
//3. Try to get @scheduled annotations on this method (SpringBoot timed task annotations)
Scheduled annotation = method.getAnnotation(Scheduled.class);
// Skip this method if you can't get the @scheduled annotation
if (annotation == null) {
continue;
}
//4. Create a source attribute for the scheduled task
// Create the source attribute of the scheduled task (used to record the configuration of the scheduled task, initialized with the original attribute in the annotation)
ScheduledSource scheduledSource = new ScheduledSource(annotation, method, bean);
// Check the attributes in the source attributes obtained on the annotation
if(! scheduledSource.check()) {throw new SuperScheduledException("In" + beanName + "Bean" + method.getName() + "Method annotation parameter error");
}
// To generate the name (ID) of the scheduled task, run beanName+ ". + method name
String name = beanName + "." + method.getName();
// The source data will be saved to the configuration manager in the form of key-value. Key: indicates the name of the scheduled task. Value: indicates the source data
superScheduledConfig.addScheduledSource(name, scheduledSource);
try {
//5. Cancel the scheduled SpringBoot task
clearOriginalScheduled(annotation);
} catch (Exception e) {
throw new SuperScheduledException("Closing the original method" + beanName + method.getName() + "Time error"); }}}// Finally, the bean returns unchanged
return bean;
}
/** * Modifies the original attribute of the annotation *@paramAnnotation Instance object *@throws Exception
*/
private void clearOriginalScheduled(Scheduled annotation) throws Exception {
changeAnnotationValue(annotation, "cron", Scheduled.CRON_DISABLED);
changeAnnotationValue(annotation, "fixedDelay", -1L);
changeAnnotationValue(annotation, "fixedDelayString"."");
changeAnnotationValue(annotation, "fixedRate", -1L);
changeAnnotationValue(annotation, "fixedRateString"."");
changeAnnotationValue(annotation, "initialDelay", -1L);
changeAnnotationValue(annotation, "initialDelayString"."");
}
/** * Get SpringBoot context *@paramApplicationContext SpringBoot context */
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext; }}Copy the code
(3) Use ApplicationRunner to initialize the custom scheduled task runner
DependsOn threadPoolTaskScheduler class DependsOn threadPoolTaskScheduler class DependsOn threadPoolTaskScheduler class DependsOn After all beans are initialized, run the custom logic. 4. The main implementation logic is in the run() method
@DependsOn("threadPoolTaskScheduler")
@Component
public class SuperScheduledApplicationRunner implements ApplicationRunner.ApplicationContextAware {
protected final Log logger = LogFactory.getLog(getClass());
private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private ApplicationContext applicationContext;
/** * Scheduled task configuration manager */
@Autowired
private SuperScheduledConfig superScheduledConfig;
/** * Scheduled task execution thread */
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
@Override
public void run(ApplicationArguments args) {
//1. The scheduled task execution thread is cached in the scheduled task configuration manager
superScheduledConfig.setTaskScheduler(threadPoolTaskScheduler);
//2. Obtain the source data of all scheduled tasks
Map<String, ScheduledSource> nameToScheduledSource = superScheduledConfig.getNameToScheduledSource();
// Process scheduled tasks one by one
for (String name : nameToScheduledSource.keySet()) {
//3. Obtain the scheduled task source data
ScheduledSource scheduledSource = nameToScheduledSource.get(name);
//4. Get all the enhanced classes
String[] baseStrengthenBeanNames = applicationContext.getBeanNamesForType(BaseStrengthen.class);
//5. Create an execution controller
SuperScheduledRunnable runnable = new SuperScheduledRunnable();
// Configure the execution controller
runnable.setMethod(scheduledSource.getMethod());
runnable.setBean(scheduledSource.getBean());
//6. Processing enhancement classes one by one (detailed analysis of the implementation principle of the enhancer)
List<Point> points = new ArrayList<>(baseStrengthenBeanNames.length);
for (String baseStrengthenBeanName : baseStrengthenBeanNames) {
//7. Proxy the enhancer as point
Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName);
// Create the proxy
Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable));
proxy.setSuperScheduledName(name);
//8. Connect all points
points.add(proxy);
}
// Form the point into a call chain
runnable.setChain(new Chain(points));
// Encapsulate the execution logic and cache it in the scheduled task configuration manager
superScheduledConfig.addRunnable(name, runnable::invoke);
try {
//8. Start the scheduled taskScheduledFuture<? > schedule = ScheduledFutureFactory.create(threadPoolTaskScheduler , scheduledSource, runnable::invoke);// Store the thread callback hook in the task configuration manager
superScheduledConfig.addScheduledFuture(name, schedule);
logger.info(df.format(LocalDateTime.now()) + "Task" + name + "Already started...");
} catch (Exception e) {
throw new SuperScheduledException("Task" + name + "Failed to start, error message:"+ e.getLocalizedMessage()); }}}@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext; }}Copy the code
(4) Dynamic management
@Component
public class SuperScheduledManager {
protected final Log logger = LogFactory.getLog(getClass());
private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Autowired
private SuperScheduledConfig superScheduledConfig;
/** * Changes the Scheduled execution cycle **@paramName Scheduled name *@paramCron Cron expression */
public void setScheduledCron(String name, String cron) {
// Terminate the original task
cancelScheduled(name);
// Create new tasks
ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
scheduledSource.clear();
scheduledSource.setCron(cron);
addScheduled(name, scheduledSource);
}
/** ** Changes the Scheduled fixedDelay **@paramName Scheduled name *@paramFixedDelay How long it takes to execute */ after the last execution point
public void setScheduledFixedDelay(String name, Long fixedDelay) {
// Terminate the original task
cancelScheduled(name);
// Create new tasks
ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
scheduledSource.clear();
scheduledSource.setFixedDelay(fixedDelay);
addScheduled(name, scheduledSource);
}
/** ** Changes the Scheduled fixedRate **@paramName Scheduled name *@paramHow long has it been since fixedRate last started executing */
public void setScheduledFixedRate(String name, Long fixedRate) {
// Terminate the original task
cancelScheduled(name);
// Create new tasks
ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
scheduledSource.clear();
scheduledSource.setFixedRate(fixedRate);
addScheduled(name, scheduledSource);
}
/** * All Scheduled applications have been started
public List<String> getRunScheduledName(a) {
Set<String> names = superScheduledConfig.getNameToScheduledFuture().keySet();
return new ArrayList<>(names);
}
/** * All Scheduled */
public List<String> getAllSuperScheduledName(a) {
Set<String> names = superScheduledConfig.getNameToRunnable().keySet();
return new ArrayList<>(names);
}
/** ** Scheduled **@paramName Scheduled name */
public void cancelScheduled(String name) {
ScheduledFuture scheduledFuture = superScheduledConfig.getScheduledFuture(name);
scheduledFuture.cancel(true);
superScheduledConfig.removeScheduledFuture(name);
logger.info(df.format(LocalDateTime.now()) + "Task" + name + "Has been terminated...");
}
/** ** Scheduled **@paramName Scheduled name *@paramScheduledSource Indicates the source information of the scheduled task */
public void addScheduled(String name, ScheduledSource scheduledSource) {
if (getRunScheduledName().contains(name)) {
throw new SuperScheduledException("Scheduled task" + name + "It's already been activated.");
}
if(! scheduledSource.check()) {throw new SuperScheduledException("Scheduled task" + name + "Error in source data content"); } scheduledSource.refreshType(); Runnable runnable = superScheduledConfig.getRunnable(name); ThreadPoolTaskScheduler taskScheduler = superScheduledConfig.getTaskScheduler(); ScheduledFuture<? > schedule = ScheduledFutureFactory.create(taskScheduler, scheduledSource, runnable); logger.info(df.format(LocalDateTime.now()) +"Task" + name + "Already started...");
superScheduledConfig.addScheduledSource(name, scheduledSource);
superScheduledConfig.addScheduledFuture(name, schedule);
}
/** ** Scheduled starts with a cron type **@paramName Scheduled name *@paramCron Cron expression */
public void addCronScheduled(String name, String cron) {
ScheduledSource scheduledSource = new ScheduledSource();
scheduledSource.setCron(cron);
addScheduled(name, scheduledSource);
}
/** * Start Scheduled with fixedDelay type **@paramName Scheduled name *@paramFixedDelay How long it takes to execute * after the last execution point@paramInitialDelay Delay of the first execution */
public void addFixedDelayScheduled(String name, Long fixedDelay, Long... initialDelay) {
ScheduledSource scheduledSource = new ScheduledSource();
scheduledSource.setFixedDelay(fixedDelay);
if(initialDelay ! =null && initialDelay.length == 1) {
scheduledSource.setInitialDelay(initialDelay[0]);
} else if(initialDelay ! =null && initialDelay.length > 1) {
throw new SuperScheduledException("Only one argument can be passed in for the delay time of the first execution.");
}
addScheduled(name, scheduledSource);
}
/** * Start Scheduled with fixedRate **@paramName Scheduled name *@paramHow long has it been since fixedRate last started executing *@paramInitialDelay Delay of the first execution */
public void addFixedRateScheduled(String name, Long fixedRate, Long... initialDelay) {
ScheduledSource scheduledSource = new ScheduledSource();
scheduledSource.setFixedRate(fixedRate);
if(initialDelay ! =null && initialDelay.length == 1) {
scheduledSource.setInitialDelay(initialDelay[0]);
} else if(initialDelay ! =null && initialDelay.length > 1) {
throw new SuperScheduledException("Only one argument can be passed in for the delay time of the first execution.");
}
addScheduled(name, scheduledSource);
}
/** * Manually execute a task **@paramName Scheduled name */
public void runScheduled(String name) { Runnable runnable = superScheduledConfig.getRunnable(name); runnable.run(); }}Copy the code
2. Enhance interface implementation
The overall idea of the enhancer implementation is the same as SpringAop, and the implementation is less complex than Aop
(1) Enhance the interface
@Order(Ordered.HIGHEST_PRECEDENCE)
public interface BaseStrengthen {
/** ** Preconditioning method **@paramBean Bean instance (or proxied bean) *@paramMethod Executes the method object *@paramArgs method argument */
void before(Object bean, Method method, Object[] args);
/** * The afterFinally method does not execute after an exception occurs@paramBean Bean instance (or proxied bean) *@paramMethod Executes the method object *@paramArgs method argument */
void after(Object bean, Method method, Object[] args);
/** * Exception enhancement method **@paramBean Bean instance (or proxied bean) *@paramMethod Executes the method object *@paramArgs method argument */
void exception(Object bean, Method method, Object[] args);
/** * Finally strengthens the method to execute ** when exceptions occur@paramBean Bean instance (or proxied bean) *@paramMethod Executes the method object *@paramArgs method argument */
void afterFinally(Object bean, Method method, Object[] args);
}
Copy the code
(2) Proxy abstract classes
public abstract class Point {
/** * Specifies the scheduled task name */
private String superScheduledName;
/** * abstract execution method, using proxy implementation *@paramRunnable Scheduled task executor */
public abstract Object invoke(SuperScheduledRunnable runnable);
/* Common get/sets omit */
}
Copy the code
(3) Call the chain class
public class Chain {
private List<Point> list;
private int index = -1;
/** * The index increases by 1 */
public int incIndex(a) {
return ++index;
}
/** * index restore */
public void resetIndex(a) {
this.index = -1; }}Copy the code
(4) Cglib dynamic proxy implementation
Use the Cglib proxy enhancer to proxy all the enhancers as call chain node points
public class RunnableBaseInterceptor implements MethodInterceptor {
/** * Scheduled task executor */
private SuperScheduledRunnable runnable;
/** * Scheduled task enhancement class */
private BaseStrengthen strengthen;
@Override
public Object intercept(Object obj, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
Object result;
// If the invoke() method is invoked
if ("invoke".equals(method.getName())) {
// Prereinforcement method
strengthen.before(obj, method, args);
try {
// Invoke the invoke() method in the executor
result = runnable.invoke();
} catch (Exception e) {
// Exception reinforcement method
strengthen.exception(obj, method, args);
throw new SuperScheduledException(strengthen.getClass() + "Error occurred during reinforcement execution", e);
} finally {
//Finally strengthens the method to execute exceptions
strengthen.afterFinally(obj, method, args);
}
// Post reinforcement method
strengthen.after(obj, method, args);
} else {
// Execute the method directly
result = methodProxy.invokeSuper(obj, args);
}
return result;
}
public RunnableBaseInterceptor(Object object, SuperScheduledRunnable runnable) {
this.runnable = runnable;
if (BaseStrengthen.class.isAssignableFrom(object.getClass())) {
this.strengthen = (BaseStrengthen) object;
} else {
throw new SuperScheduledException(object.getClass() + "Object not of type BaseStrengthen"); }}public RunnableBaseInterceptor(a) {}}Copy the code
(5) Implementation of scheduled task executor
public class SuperScheduledRunnable {
/** * the original method */
private Method method;
/** * the bean in which the method is located */
private Object bean;
/** * intensifier call chain */
private Chain chain;
public Object invoke(a) {
Object result;
// Index is incremented by 1
if (chain.incIndex() == chain.getList().size()) {
// The enhanced methods in the call chain have all been executed
try {
// Call chain initialization
chain.resetIndex();
// When the enhancer is complete, execute the original method
result = method.invoke(bean);
} catch (IllegalAccessException | InvocationTargetException e) {
throw newSuperScheduledException(e.getLocalizedMessage()); }}else {
// Get the proxied method enhancer
Point point = chain.getList().get(chain.getIndex());
// Execute the enhancer agent
// In the enhancer agent, the method executor is called back to form a call chain that runs the enhancers in the call chain one by one
result = point.invoke(this);
}
return result;
}
/* Common get/sets omit */
}
Copy the code
(6) Enhancer agent logic
Com. Gyx. Superscheduled. Core. The code snippet in SuperScheduledApplicationRunner class
// Create an execution controller
SuperScheduledRunnable runnable = new SuperScheduledRunnable();
runnable.setMethod(scheduledSource.getMethod());
runnable.setBean(scheduledSource.getBean());
// The proxy object used to store the enhancer
List<Point> points = new ArrayList<>(baseStrengthenBeanNames.length);
// Loop through all enhancers beanName
for (String baseStrengthenBeanName : baseStrengthenBeanNames) {
// Get the enhancer's bean object
Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName);
// Proxy the enhancer as a Point node
Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable));
proxy.setSuperScheduledName(name);
// The enhancer's proxy object is cached in the list
points.add(proxy);
}
// Generate the call chain for the collection of enhancer proxy instances
// Set the call chain in the execution controller
runnable.setChain(new Chain(points));
Copy the code