Springboot integrates Quartz for dynamic loading of scheduled tasks

Springboot integrates Quartz to load dynamic scheduled tasks. You can add, stop, delete, and restart scheduled tasks without restarting the program, and configure scheduled tasks through the mysql database.

The directory structure

1. The database

Create a database configuration table to store the configuration of scheduled tasks

CREATE TABLE `task_config`  (
  `id` int UNSIGNED NOT NULL AUTO_INCREMENT,
  `task_id` varchar(8) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'task id'.`cron` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'Cron expression'.`class_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'Job reference address'.`description` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'description'.`status` tinyint NOT NULL COMMENT 'Scheduled task status 0 Disabled,1 enabled',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;
Copy the code

2. The package

<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.16.18</version>
        <scope>provided</scope>
    </dependency>
    <! Quartz and Springboot integration -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-quartz</artifactId>
        <version>2.0.4. RELEASE</version>
    </dependency>
    <! --mybatisPlus allows us to operate mybatis more easily -->
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.4.0</version>
    </dependency>
    <! Database connection pool Framework -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid</artifactId>
        <version>1.1.9</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.17</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>
Copy the code

3.Springboot starts the configuration

1. Configure the data source

Application-datasource. Yml configuration file

spring:
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: JDBC: mysql: / / 127.0.0.1:3306 / test? characterEncoding=utf8&useSSL=true&useSSL=false&allowMultiQueries=true&serverTimezone=UTC
    username: root
    password: 123456
    type: com.alibaba.druid.pool.DruidDataSource
Copy the code

Note: The driver-class-name command is used for different mysql versions, and the time zone serverTimezone must be configured for mysql8

2. MybatisPlus configuration

Application-mybatis. Yml configuration file

mybatis-plus:
	# Scan the location of XML
  mapper-locations: classpath*:/dao/*.xml
  configuration:
    This configuration prints out the executed SQL for use during development or testing
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
Copy the code

3. The Spring configuration

Application. Yml configuration file

spring:
  application:
    name: schedule-server
  profiles:
  	Configure the activation profile, application-mybatis, application-datasource
    active: datasource,mybatis
server:
  port: 9000
Copy the code

4. Mybatis layer

1. Create entities corresponding to database tables

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.experimental.Accessors;

import java.io.Serializable;

@Data
@Accessors(chain = true)
@TableName("task_config")
public class TaskConfigEntity implements Serializable {

    @TableId(value = "id",type = IdType.AUTO)
    private Integer id;

    @TableField("task_id")
    private String taskId;

    @TableField("cron")
    private String cron;

    @TableField("class_name")
    private String className;

    @TableField("description")
    private String description;

    @TableField("status")
    private Integer status;
}
Copy the code

2. The dao interface

Create the DAO interface as well

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.demo.entity.TaskConfigEntity;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface TaskConfigDao extends BaseMapper<TaskConfigEntity> {}Copy the code

3. The XML file

Create dao\ TaskConfigdao.xml under the Resource package


      
<! DOCTYPEmapper PUBLIC "- / / mybatis.org//DTD Mapper / 3.0 / EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.example.demo.dao.TaskConfigDao">

</mapper>
Copy the code

4. Scan the DAO interface

package com.example.demo;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
// Scan the path
@MapperScan("com.example.demo.dao")
public class DemoApplication {

    public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); }}Copy the code

5. Create a thread pool

Create a thread pool to run scheduled tasks

package com.example.demo.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Configuration
public class ExecutorConfig {
    @Bean(name = "taskExecutor")
    public Executor getAsyncExecutor(a) throws InterruptedException{
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(1024);
        executor.setKeepAliveSeconds(4);
        executor.setQueueCapacity(0);
        executor.setRejectedExecutionHandler((Runnable r, ThreadPoolExecutor exe) -> {

            // Take advantage of the BlockingQueue feature to wait for tasks to be added when the queue is full
            try {
                if(! exe.getQueue().offer(r,30, TimeUnit.SECONDS)) {
                    throw new Exception("Task offer failed after 30 sec"); }}catch(Exception e) { e.printStackTrace(); }});returnexecutor; }}Copy the code

6. Quartz configuration

1. Automatic injection factory creation

Note: Quartz can use Autowired only with this configuration, otherwise Quartz cannot use Autowired

package com.example.demo.factory;

import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
import org.springframework.stereotype.Component;

/** * Quartz can inject */ using autowired only after this config is configured
@Component
public class ScheduleAutoBeanFactory extends SpringBeanJobFactory implements ApplicationContextAware {
    private transient AutowireCapableBeanFactory beanFactory;
    @Override
    public void setApplicationContext(final ApplicationContext context) {
        beanFactory = context.getAutowireCapableBeanFactory();
    }
    @Override
    protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
        final Object job = super.createJobInstance(bundle);
        beanFactory.autowireBean(job);
        returnjob; }}Copy the code

2. The quartz configuration

package com.example.demo.config;

import org.quartz.Scheduler;
import org.quartz.spi.JobFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

import javax.annotation.Resource;
import java.util.concurrent.Executor;

@Configuration
public class ScheduleJobConfig {
	
    @Resource(name = "taskExecutor")
    private Executor taskExecutor;

    @Bean("schedulerFactoryBean")
    public SchedulerFactoryBean createFactoryBean(JobFactory jobFactory){
        SchedulerFactoryBean factoryBean = new SchedulerFactoryBean();
        factoryBean.setJobFactory(jobFactory);
        factoryBean.setTaskExecutor(taskExecutor);
        factoryBean.setOverwriteExistingJobs(true);
        return factoryBean;
    }
    // Use this class to operate on scheduled tasks
    @Bean
    public Scheduler scheduler(@Qualifier("schedulerFactoryBean") SchedulerFactoryBean factoryBean) {
        returnfactoryBean.getScheduler(); }}Copy the code

7. Quartz implementation

1. Basic functions

package com.example.demo.service;

import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.demo.dao.TaskConfigDao;
import com.example.demo.entity.TaskConfigEntity;
import lombok.extern.log4j.Log4j2;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@Log4j2
public class ScheduleJobService {

    @Autowired
    private TaskConfigDao taskConfigDao;

    @Autowired
    private Scheduler scheduler;

    /** * The program starts to load scheduled tasks */
    public void startJob(a){
        List<TaskConfigEntity> taskConfigEntities = taskConfigDao.selectList(
                Wrappers.<TaskConfigEntity>lambdaQuery()
                        .eq(TaskConfigEntity::getStatus, 1));
        if (taskConfigEntities == null || taskConfigEntities.size() == 0){
            log.error("Scheduled task load data is empty");
            return;
        }
        for (TaskConfigEntity configEntity : taskConfigEntities) {
            CronTrigger cronTrigger = null;
            JobDetail jobDetail = null;
            try {
                cronTrigger = getCronTrigger(configEntity);
                jobDetail = getJobDetail(configEntity);
                scheduler.scheduleJob(jobDetail,cronTrigger);
                log.info("No. : {} Scheduled task loaded successfully",configEntity.getTaskId());
            }catch (Exception e){
                log.error("No. : {} Failed to load scheduled task",configEntity.getTaskId()); }}try {
            scheduler.start();
        } catch (SchedulerException e) {
            log.error("Failed to start scheduled task",e); }}/** * Stop the task *@param taskId
     */
    public void stopJob(String taskId) throws SchedulerException {
        scheduler.pauseJob(JobKey.jobKey(taskId));
    }

    /** * Restore task *@param taskId
     * @throws SchedulerException
     */
    public void resumeJob(String taskId) throws SchedulerException {
        scheduler.resumeJob(JobKey.jobKey(taskId));
    }

    /** * Add new job *@param taskId
     * @throws SchedulerConfigException
     */
    public void loadJob(String taskId) throws SchedulerConfigException {
        TaskConfigEntity taskConfigEntity = taskConfigDao.selectOne(
                Wrappers.<TaskConfigEntity>lambdaQuery()
                        .eq(TaskConfigEntity::getTaskId, taskId)
                        .eq(TaskConfigEntity::getStatus, 1));
        if (taskConfigEntity == null) {throw new SchedulerConfigException("Related Job configuration not found");
        }
        try {
            JobDetail jobDetail = getJobDetail(taskConfigEntity);
            CronTrigger cronTrigger = getCronTrigger(taskConfigEntity);
            scheduler.scheduleJob(jobDetail, cronTrigger);
        } catch (Exception e) {
            log.error("Abnormal loading scheduled task",e);
            throw new SchedulerConfigException("Abnormal loading scheduled task", e); }}public void unloadJob(String taskId) throws SchedulerException {
        // Stop the trigger
        scheduler.pauseTrigger(TriggerKey.triggerKey(taskId));
        // Unmount the scheduled task
        scheduler.unscheduleJob(TriggerKey.triggerKey(taskId));
        // Delete the original job
        scheduler.deleteJob(JobKey.jobKey(taskId));
    }

    /** * Reload the execution plan *@throws Exception
     */
    public void reload(String taskId) throws Exception {
        TaskConfigEntity taskConfigEntity = taskConfigDao.selectOne(
                Wrappers.<TaskConfigEntity>lambdaQuery()
                        .eq(TaskConfigEntity::getTaskId, taskId)
                        .eq(TaskConfigEntity::getStatus, 1));

        String jobCode = taskConfigEntity.getTaskId();
        // Get the previous trigger
        TriggerKey triggerKey = TriggerKey.triggerKey(jobCode);
        // Stop the trigger
        scheduler.pauseTrigger(triggerKey);
        // Delete trigger
        scheduler.unscheduleJob(triggerKey);
        // Delete the original job
        scheduler.deleteJob(JobKey.jobKey(jobCode));

        JobDetail jobDetail = getJobDetail(taskConfigEntity);
        CronTrigger cronTrigger = getCronTrigger(taskConfigEntity);
        // Reload the job
        scheduler.scheduleJob(jobDetail, cronTrigger);
    }
	/ / assembly JobDetail
    private JobDetail getJobDetail(TaskConfigEntity configEntity) throws ClassNotFoundException {

        Class<? extends Job> aClass = Class.forName(configEntity.getClassName()).asSubclass(Job.class);

        return JobBuilder.newJob()
                .withIdentity(JobKey.jobKey(configEntity.getTaskId()))
                .withDescription(configEntity.getDescription())
                .ofType(aClass).build();
    }
    / / assembly CronTrigger
    private CronTrigger getCronTrigger(TaskConfigEntity configEntity){
        CronTrigger cronTrigger = null;
        CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(configEntity.getCron());
        cronTrigger = TriggerBuilder.newTrigger()
                .withIdentity(TriggerKey.triggerKey(configEntity.getTaskId()))
                .withSchedule(cronScheduleBuilder)
                .build();
        returncronTrigger; }}Copy the code

2. The scheduled task is automatically loaded

package com.example.demo.listener;

import com.example.demo.service.ScheduleJobService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

/** * listen for the container to start and start adding from the database to the scheduled task */
@Component
public class ScheduleJobInitListener implements CommandLineRunner {

    @Autowired
    private ScheduleJobService jobService;

    @Override
    public void run(String... strings) throws Exception { jobService.startJob(); }}Copy the code

3. Load scheduled tasks using remote requests

package com.example.demo.controller;

import com.example.demo.service.ScheduleJobService;
import org.quartz.SchedulerConfigException;
import org.quartz.SchedulerException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ScheduleJobController {
    @Autowired
    private ScheduleJobService jobService;

    @GetMapping("/load/{taskId}")
    public String loadJob(@PathVariable("taskId") String taskId){
        try {
            jobService.loadJob(taskId);
        } catch (SchedulerConfigException e) {
            return Failed to import a scheduled task;
        }
        return "Success";
    }
    @GetMapping("/resume/{taskId}")
    public String resumeJob(@PathVariable("taskId") String taskId){
        try {
            jobService.resumeJob(taskId);
        }catch (SchedulerException e) {
            return "Failed to restore scheduled task";
        }
        return "Success";
    }
    @GetMapping("/stop/{taskId}")
    public String stopJob(@PathVariable("taskId") String taskId){
        try {
            jobService.stopJob(taskId);
        }catch (SchedulerException e) {
            return "Failed to suspend a scheduled task";
        }
        return "Success";
    }
    @GetMapping("/unload/{taskId}")
    public String unloadJob(@PathVariable("taskId") String taskId){
        try {
            jobService.unloadJob(taskId);
        }catch (SchedulerException e) {
            return "Failed to uninstall the scheduled task";
        }
        return "Success"; }}Copy the code

8. Scheduled tasks

package com.example.demo.job;

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class MyJob implements Job {
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        LocalDateTime now = LocalDateTime.now();
        System.out.println("Scheduled task begins"+now.format(DateTimeFormatter.ofPattern("HH:mm:ss"))); }}Copy the code

Dynamic loading can be implemented by implementing the Job interface and then configuring it to the database.

9. The test

1. Insert a scheduled task configuration to the database.

Where class_name is the reference path of our Job

INSERT INTO `test`.`task_config` (`id`, `task_id`, `cron`, `class_name`, `description`, `status`) VALUES ('1'.'TB00001'.'0 * * * * ?'.'com.example.demo.job.MyJob'.'Triggered every minute.'.'1');

Copy the code

2. Start the framework

I configured it to run once a minute

3. Make a request through the network

You can use a browser to access the address and perform scheduled tasks.