Springboot integrates Quartz for dynamic loading of scheduled tasks
Springboot integrates Quartz to load dynamic scheduled tasks. You can add, stop, delete, and restart scheduled tasks without restarting the program, and configure scheduled tasks through the mysql database.
The directory structure
1. The database
Create a database configuration table to store the configuration of scheduled tasks
CREATE TABLE `task_config` (
`id` int UNSIGNED NOT NULL AUTO_INCREMENT,
`task_id` varchar(8) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'task id'.`cron` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'Cron expression'.`class_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'Job reference address'.`description` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'description'.`status` tinyint NOT NULL COMMENT 'Scheduled task status 0 Disabled,1 enabled',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
Copy the code
2. The package
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
<scope>provided</scope>
</dependency>
<! Quartz and Springboot integration -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
<version>2.0.4. RELEASE</version>
</dependency>
<! --mybatisPlus allows us to operate mybatis more easily -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.0</version>
</dependency>
<! Database connection pool Framework -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.9</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.17</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
Copy the code
3.Springboot starts the configuration
1. Configure the data source
Application-datasource. Yml configuration file
spring:
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: JDBC: mysql: / / 127.0.0.1:3306 / test? characterEncoding=utf8&useSSL=true&useSSL=false&allowMultiQueries=true&serverTimezone=UTC
username: root
password: 123456
type: com.alibaba.druid.pool.DruidDataSource
Copy the code
Note: The driver-class-name command is used for different mysql versions, and the time zone serverTimezone must be configured for mysql8
2. MybatisPlus configuration
Application-mybatis. Yml configuration file
mybatis-plus:
# Scan the location of XML
mapper-locations: classpath*:/dao/*.xml
configuration:
This configuration prints out the executed SQL for use during development or testing
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
Copy the code
3. The Spring configuration
Application. Yml configuration file
spring:
application:
name: schedule-server
profiles:
Configure the activation profile, application-mybatis, application-datasource
active: datasource,mybatis
server:
port: 9000
Copy the code
4. Mybatis layer
1. Create entities corresponding to database tables
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
@Data
@Accessors(chain = true)
@TableName("task_config")
public class TaskConfigEntity implements Serializable {
@TableId(value = "id",type = IdType.AUTO)
private Integer id;
@TableField("task_id")
private String taskId;
@TableField("cron")
private String cron;
@TableField("class_name")
private String className;
@TableField("description")
private String description;
@TableField("status")
private Integer status;
}
Copy the code
2. The dao interface
Create the DAO interface as well
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.demo.entity.TaskConfigEntity;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface TaskConfigDao extends BaseMapper<TaskConfigEntity> {}Copy the code
3. The XML file
Create dao\ TaskConfigdao.xml under the Resource package
<! DOCTYPEmapper PUBLIC "- / / mybatis.org//DTD Mapper / 3.0 / EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.example.demo.dao.TaskConfigDao">
</mapper>
Copy the code
4. Scan the DAO interface
package com.example.demo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
// Scan the path
@MapperScan("com.example.demo.dao")
public class DemoApplication {
public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); }}Copy the code
5. Create a thread pool
Create a thread pool to run scheduled tasks
package com.example.demo.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Configuration
public class ExecutorConfig {
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor(a) throws InterruptedException{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(1024);
executor.setKeepAliveSeconds(4);
executor.setQueueCapacity(0);
executor.setRejectedExecutionHandler((Runnable r, ThreadPoolExecutor exe) -> {
// Take advantage of the BlockingQueue feature to wait for tasks to be added when the queue is full
try {
if(! exe.getQueue().offer(r,30, TimeUnit.SECONDS)) {
throw new Exception("Task offer failed after 30 sec"); }}catch(Exception e) { e.printStackTrace(); }});returnexecutor; }}Copy the code
6. Quartz configuration
1. Automatic injection factory creation
Note: Quartz can use Autowired only with this configuration, otherwise Quartz cannot use Autowired
package com.example.demo.factory;
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
import org.springframework.stereotype.Component;
/** * Quartz can inject */ using autowired only after this config is configured
@Component
public class ScheduleAutoBeanFactory extends SpringBeanJobFactory implements ApplicationContextAware {
private transient AutowireCapableBeanFactory beanFactory;
@Override
public void setApplicationContext(final ApplicationContext context) {
beanFactory = context.getAutowireCapableBeanFactory();
}
@Override
protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
final Object job = super.createJobInstance(bundle);
beanFactory.autowireBean(job);
returnjob; }}Copy the code
2. The quartz configuration
package com.example.demo.config;
import org.quartz.Scheduler;
import org.quartz.spi.JobFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import javax.annotation.Resource;
import java.util.concurrent.Executor;
@Configuration
public class ScheduleJobConfig {
@Resource(name = "taskExecutor")
private Executor taskExecutor;
@Bean("schedulerFactoryBean")
public SchedulerFactoryBean createFactoryBean(JobFactory jobFactory){
SchedulerFactoryBean factoryBean = new SchedulerFactoryBean();
factoryBean.setJobFactory(jobFactory);
factoryBean.setTaskExecutor(taskExecutor);
factoryBean.setOverwriteExistingJobs(true);
return factoryBean;
}
// Use this class to operate on scheduled tasks
@Bean
public Scheduler scheduler(@Qualifier("schedulerFactoryBean") SchedulerFactoryBean factoryBean) {
returnfactoryBean.getScheduler(); }}Copy the code
7. Quartz implementation
1. Basic functions
package com.example.demo.service;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.demo.dao.TaskConfigDao;
import com.example.demo.entity.TaskConfigEntity;
import lombok.extern.log4j.Log4j2;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Log4j2
public class ScheduleJobService {
@Autowired
private TaskConfigDao taskConfigDao;
@Autowired
private Scheduler scheduler;
/** * The program starts to load scheduled tasks */
public void startJob(a){
List<TaskConfigEntity> taskConfigEntities = taskConfigDao.selectList(
Wrappers.<TaskConfigEntity>lambdaQuery()
.eq(TaskConfigEntity::getStatus, 1));
if (taskConfigEntities == null || taskConfigEntities.size() == 0){
log.error("Scheduled task load data is empty");
return;
}
for (TaskConfigEntity configEntity : taskConfigEntities) {
CronTrigger cronTrigger = null;
JobDetail jobDetail = null;
try {
cronTrigger = getCronTrigger(configEntity);
jobDetail = getJobDetail(configEntity);
scheduler.scheduleJob(jobDetail,cronTrigger);
log.info("No. : {} Scheduled task loaded successfully",configEntity.getTaskId());
}catch (Exception e){
log.error("No. : {} Failed to load scheduled task",configEntity.getTaskId()); }}try {
scheduler.start();
} catch (SchedulerException e) {
log.error("Failed to start scheduled task",e); }}/** * Stop the task *@param taskId
*/
public void stopJob(String taskId) throws SchedulerException {
scheduler.pauseJob(JobKey.jobKey(taskId));
}
/** * Restore task *@param taskId
* @throws SchedulerException
*/
public void resumeJob(String taskId) throws SchedulerException {
scheduler.resumeJob(JobKey.jobKey(taskId));
}
/** * Add new job *@param taskId
* @throws SchedulerConfigException
*/
public void loadJob(String taskId) throws SchedulerConfigException {
TaskConfigEntity taskConfigEntity = taskConfigDao.selectOne(
Wrappers.<TaskConfigEntity>lambdaQuery()
.eq(TaskConfigEntity::getTaskId, taskId)
.eq(TaskConfigEntity::getStatus, 1));
if (taskConfigEntity == null) {throw new SchedulerConfigException("Related Job configuration not found");
}
try {
JobDetail jobDetail = getJobDetail(taskConfigEntity);
CronTrigger cronTrigger = getCronTrigger(taskConfigEntity);
scheduler.scheduleJob(jobDetail, cronTrigger);
} catch (Exception e) {
log.error("Abnormal loading scheduled task",e);
throw new SchedulerConfigException("Abnormal loading scheduled task", e); }}public void unloadJob(String taskId) throws SchedulerException {
// Stop the trigger
scheduler.pauseTrigger(TriggerKey.triggerKey(taskId));
// Unmount the scheduled task
scheduler.unscheduleJob(TriggerKey.triggerKey(taskId));
// Delete the original job
scheduler.deleteJob(JobKey.jobKey(taskId));
}
/** * Reload the execution plan *@throws Exception
*/
public void reload(String taskId) throws Exception {
TaskConfigEntity taskConfigEntity = taskConfigDao.selectOne(
Wrappers.<TaskConfigEntity>lambdaQuery()
.eq(TaskConfigEntity::getTaskId, taskId)
.eq(TaskConfigEntity::getStatus, 1));
String jobCode = taskConfigEntity.getTaskId();
// Get the previous trigger
TriggerKey triggerKey = TriggerKey.triggerKey(jobCode);
// Stop the trigger
scheduler.pauseTrigger(triggerKey);
// Delete trigger
scheduler.unscheduleJob(triggerKey);
// Delete the original job
scheduler.deleteJob(JobKey.jobKey(jobCode));
JobDetail jobDetail = getJobDetail(taskConfigEntity);
CronTrigger cronTrigger = getCronTrigger(taskConfigEntity);
// Reload the job
scheduler.scheduleJob(jobDetail, cronTrigger);
}
/ / assembly JobDetail
private JobDetail getJobDetail(TaskConfigEntity configEntity) throws ClassNotFoundException {
Class<? extends Job> aClass = Class.forName(configEntity.getClassName()).asSubclass(Job.class);
return JobBuilder.newJob()
.withIdentity(JobKey.jobKey(configEntity.getTaskId()))
.withDescription(configEntity.getDescription())
.ofType(aClass).build();
}
/ / assembly CronTrigger
private CronTrigger getCronTrigger(TaskConfigEntity configEntity){
CronTrigger cronTrigger = null;
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(configEntity.getCron());
cronTrigger = TriggerBuilder.newTrigger()
.withIdentity(TriggerKey.triggerKey(configEntity.getTaskId()))
.withSchedule(cronScheduleBuilder)
.build();
returncronTrigger; }}Copy the code
2. The scheduled task is automatically loaded
package com.example.demo.listener;
import com.example.demo.service.ScheduleJobService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
/** * listen for the container to start and start adding from the database to the scheduled task */
@Component
public class ScheduleJobInitListener implements CommandLineRunner {
@Autowired
private ScheduleJobService jobService;
@Override
public void run(String... strings) throws Exception { jobService.startJob(); }}Copy the code
3. Load scheduled tasks using remote requests
package com.example.demo.controller;
import com.example.demo.service.ScheduleJobService;
import org.quartz.SchedulerConfigException;
import org.quartz.SchedulerException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ScheduleJobController {
@Autowired
private ScheduleJobService jobService;
@GetMapping("/load/{taskId}")
public String loadJob(@PathVariable("taskId") String taskId){
try {
jobService.loadJob(taskId);
} catch (SchedulerConfigException e) {
return Failed to import a scheduled task;
}
return "Success";
}
@GetMapping("/resume/{taskId}")
public String resumeJob(@PathVariable("taskId") String taskId){
try {
jobService.resumeJob(taskId);
}catch (SchedulerException e) {
return "Failed to restore scheduled task";
}
return "Success";
}
@GetMapping("/stop/{taskId}")
public String stopJob(@PathVariable("taskId") String taskId){
try {
jobService.stopJob(taskId);
}catch (SchedulerException e) {
return "Failed to suspend a scheduled task";
}
return "Success";
}
@GetMapping("/unload/{taskId}")
public String unloadJob(@PathVariable("taskId") String taskId){
try {
jobService.unloadJob(taskId);
}catch (SchedulerException e) {
return "Failed to uninstall the scheduled task";
}
return "Success"; }}Copy the code
8. Scheduled tasks
package com.example.demo.job;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class MyJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
LocalDateTime now = LocalDateTime.now();
System.out.println("Scheduled task begins"+now.format(DateTimeFormatter.ofPattern("HH:mm:ss"))); }}Copy the code
Dynamic loading can be implemented by implementing the Job interface and then configuring it to the database.
9. The test
1. Insert a scheduled task configuration to the database.
Where class_name is the reference path of our Job
INSERT INTO `test`.`task_config` (`id`, `task_id`, `cron`, `class_name`, `description`, `status`) VALUES ('1'.'TB00001'.'0 * * * * ?'.'com.example.demo.job.MyJob'.'Triggered every minute.'.'1');
Copy the code
2. Start the framework
I configured it to run once a minute
3. Make a request through the network
You can use a browser to access the address and perform scheduled tasks.