preface

In daily development, in addition to front-end development interface, we also need to write some timing processing tasks, such as daily timing not all users push messages. A mature scheduled task scheduling center can manage the information of all tasks through the management system, and can dynamically change the execution time and immediate execution of tasks.

Recently, the company’s business needs a scheduled task scheduling center system. However, the company cannot find a system developed by Node through searching the whole network. Therefore, it implements a scheduled task scheduling center system by itself.

Functions that need to be implemented

  • Add, delete, change and check tasks
  • Immediate execution of tasks
  • Start/close a task
  • Service restart Automatically loads scheduled tasks

Database design

CREATE TABLE `schedule_job` (
  `job_id` int(11) NOT NULL AUTO_INCREMENT,
  `cron` varchar(50) NOT NULL DEFAULT ' ' COMMENT 'Cron expression',
  `jobName` varchar(100) NOT NULL DEFAULT ' ' COMMENT 'Task name',
  `jobHandler` varchar(100) NOT NULL DEFAULT ' ' COMMENT 'Task handling method',
  `params` varchar(255) NOT NULL COMMENT 'parameters',
  `description` varchar(255) NOT NULL DEFAULT ' ' COMMENT 'description',
  `status` int(1) NOT NULL DEFAULT '1' COMMENT 'State 0 enabled -1 stopped',
  `create_by` varchar(100) NOT NULL COMMENT 'Founder',
  `update_by` varchar(100) NOT NULL COMMENT 'Update person',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Creation time',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Update Time'.PRIMARY KEY (`job_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Scheduled Task List';
Copy the code

Add, delete, change and check tasks

// app/routers/task.js
'use strict';
module.exports = app= > {
  const { router, controller, config, middleware } = app;
  const checkTokenHandler = middleware.checkTokenHandler();
  // List of scheduled tasks
  router.get(`${config.contextPath}/task/schedule/list`, checkTokenHandler, controller.task.scheduleList);
  // Modify/add a scheduled task
  router.post(`${config.contextPath}/task/schedule/edit`, checkTokenHandler, controller.task.editSchedule);
  // Delete a scheduled task
  router.post(`${config.contextPath}/task/schedule/delete`, checkTokenHandler, controller.task.deleteSchedule);
  // Updates the scheduled task status
  router.post(`${config.contextPath}/task/schedule/status/update`, checkTokenHandler, controller.task.updateStatusSchedule);
};

// app/controller/task.js
'use strict';
const Controller = require('egg').Controller;
const { setResult } = require('.. /utils');
class TaskController extends Controller {
  /** * Scheduled task management */
  async scheduleList() {
    const { ctx } = this;
    const result = await ctx.service.taskService.scheduleList(ctx.request.query);
    ctx.body = setResult({ data: result });
  }
  /** * Modify/Add scheduled task */
  async editSchedule() {
    const { ctx } = this;
    const { username } = ctx.request.headers;
    await ctx.service.taskService.editSchedule(username, ctx.request.body);
    ctx.body = setResult();
  }
  /** * Delete the scheduled task */
  async deleteSchedule() {
    const { ctx } = this;
    await ctx.service.taskService.deleteSchedule(ctx.request.body);
    ctx.body = setResult();
  }
  /** * Updates the scheduled task status */
  async updateStatusSchedule() {
    const { ctx } = this;
    awaitctx.service.taskService.updateStatusSchedule(ctx.request.body); ctx.body = setResult(); }}module.exports = TaskController;

// app/service/taskService.js
'use strict';
const { Service } = require('egg');
const { SCHEDULE_STATUS } = require('.. /constants');
class TaskService extends Service {
  // Manage scheduled tasks
  async scheduleList({ page = 1, size = 20 }) {
    const limit = parseInt(size),
      offset = parseInt(page - 1) * parseInt(size);

    const [ list, total ] = await Promise.all([
      this.app.mysql.select('schedule_job', {
        orders: [[ 'create_time'.'desc' ]],
        limit,
        offset,
      }),
      this.app.mysql.count('schedule_job'),]);return { list, total };
  }
  // Modify/add a scheduled task
  async editSchedule(userName, { job_id, cron, jobName, jobHandler, params = ' ', description = ' ' }) {
    if(! job_id) {/ / new
      await this.app.mysql.insert('schedule_job', {
        cron,
        jobName,
        jobHandler,
        description,
        params,
        create_by: userName,
        update_by: userName,
        create_time: new Date(),
        update_time: new Date()});return;
    }
    / / modify
    await this.app.mysql.update('schedule_job', {
      cron,
      jobName,
      jobHandler,
      description,
      params,
      update_by: userName,
      update_time: new Date()}, {where: { job_id } });
  }
  // Delete a scheduled task
  async deleteSchedule({ job_id }) {
    const result = await this.app.mysql.delete('schedule_job', { job_id });
    if (result.affectedRows === 1) {
      const schedule = await this.app.mysql.get('schedule_job', { job_id });
      if (schedule.status === SCHEDULE_STATUS.RUN) {
        // Stop the task
        await this.ctx.helper.cancelSchedule(schedule.jobName); }}}// Updates the scheduled task status
  async updateStatusSchedule({ job_id, status }) {
    await this.app.mysql.update('schedule_job', { status }, { where: { job_id } }); }}module.exports = TaskService;
Copy the code

Implement the scheduled task start, cancel and all tasks

Node-schedule is a flexible cron-class and non-Cron-class job scheduler for Node.js. It allows you to schedule execution on a specific date using optional repetition rules. It uses only one timer at any given time (rather than reevaluating upcoming jobs per second/minute), providing ways to manage tasks such as start and stop.

// app/extend/helper.js
'use strict';
const schedule = require('node-schedule');
/** * a stack for scheduled tasks */
const scheduleStacks = {};

module.exports = {
  /** * gets all tasks currently executing */
  async getScheduleStacks() {
    return scheduleStacks;
  },
  /** * Create a scheduled task *@param {*} Id Task ID *@param {*} cron Cron
   * @param {*} JobName Task name *@param {*} The jobHandler task method * in daily use, there may be the same handler has different processing logic, so you need to pass in the task ID * for example: In message push, different contents will be pushed to the same users at different times, and the contents are stored in the task information. The business code needs to query the corresponding task information to read the push information and process the next logical */
  async generateSchedule(id, cron, jobName, jobHandler) {
    this.ctx.logger.info('create scheduled task', task ID: %s, cron: %s, task name: %s, task method: %s', id, cron, jobName, jobHandler);
    scheduleStacks[jobName] = schedule.scheduleJob(cron, () = > {
      this.service.scheduleService[jobHandler](id);
    });
  },
  /** * Cancel/stop the scheduled task *@param {*} JobName Task name */
  async cancelSchedule(jobName) {
    this.ctx.logger.info('[Cancel scheduled task], task name: %s', jobName); scheduleStacks[jobName] && scheduleStacks[jobName].cancel(); }};Copy the code

The specific handler of a task

ScheduleService stores all task processing programs. Currently, it only manages a small number of tasks. If the tasks are huge, methods of different services can be called according to different task types.

At present, only one-time execution is implemented, without considering the failure, anomaly and other phenomena of the task, which will be improved later when there is time

// app/service/scheduleService.js
'use strict';
const { Service } = require("egg");
class ScheduleService extends Service {
  /**
   * 测试处理程序
   */
  async testHandler(job_id) {
    // Read a lock to ensure that only one process can execute a task at the same time
    const locked = await this.app.redlock.lock('sendAllUserBroadcast:' + job_id, 'sendAllUserBroadcast'.180);
    if(! locked)return false;

    const schedule = await this.app.mysql.get('schedule_job', { job_id });
    // Replace this with a specific business code
    await this.logger.info('I am a test task, task info: %j', schedule);

    / / releases the lock
    await this.app.redlock.unlock('sendAllUserBroadcast:'+ job_id); }}module.exports = ScheduleService;
Copy the code

Service restart Automatically loads scheduled tasks

// app.js
'use strict';
const { SCHEDULE_STATUS } = require('./app/constants');
class AppBootHook {
  constructor(app) {
    this.app = app;
    this.ctx = app.createAnonymousContext();
  }
  async willReady() {
    await this.app.logger.info('[Initializing scheduled tasks] Start... ');
    // Query the scheduled task
    const schedules = await this.app.mysql.select('schedule_job', { where: { status: SCHEDULE_STATUS.RUN } });
      // Periodically registers scheduled tasks
    schedules.forEach(async schedule => {
      await this.ctx.helper.generateSchedule(schedule.job_id, schedule.cron, schedule.jobName, schedule.jobHandler);
    });
    await this.app.logger.info('[Initializing a scheduled task] Initialize a scheduled task: %d, end... ', schedules.length);
  }
  async beforeClose() {
    await this.app.logger.info('[Destroy scheduled Task] Start... ');
    const scheduleStacks = await this.ctx.helper.getScheduleStacks();
    Reflect.ownKeys(scheduleStacks).forEach(async key => {
      await this.ctx.helper.cancelSchedule(key);
    });
    await this.app.logger.info('[Destroy Scheduled Task] Number of destroyed scheduled tasks: %d, end... '.Reflect.ownKeys(scheduleStacks).length); }}module.exports = AppBootHook;
Copy the code

Improve the management of tasks

// app/routers/task.js.// Execute the task
router.post(`${config.contextPath}/task/schedule/run`, checkTokenHandler, controller.task.runSchedule); .// app/controller/task.js
/** * Perform the task */
async runSchedule() {
  const { ctx } = this;
  await ctx.service.taskService.runSchedule(ctx.request.body);
  ctx.body = setResult();
}

// app/service/taskService.js
// Modify/add a scheduled task
async editSchedule(userName, { job_id, cron, jobName, jobHandler, params = ' ', description = ' ' }) {
  if (result.affectedRows === 1) {
    const schedule = await this.app.mysql.get('schedule_job', { job_id });
    // Optional chain operator '? 'can be used here if version permits. `
    if (schedule && schedule.status === SCHEDULE_STATUS.RUN) {
      // Reset the task in start state
      await this.ctx.helper.cancelSchedule(jobName);
      await this.ctx.helper.generateSchedule(job_id, cron, jobName, jobHandler); }}}// Updates the scheduled task status
async updateStatusSchedule({ job_id, status }) {
  const result = await this.app.mysql.update('schedule_job', { status }, { where: { job_id } });
  // Check whether the update is successful
  if (result.affectedRows === 1) {
    const schedule = await this.app.mysql.get('schedule_job', { job_id });
    if (status === SCHEDULE_STATUS.RUN) {
      // Start the task
      await this.ctx.helper.generateSchedule(job_id, schedule.cron, schedule.jobName, schedule.jobHandler);
    } else {
      // Stop the task
      await this.ctx.helper.cancelSchedule(schedule.jobName); }}}// Execute the task
async runSchedule({ job_id }) {
  const schedule = await this.app.mysql.get('schedule_job', { job_id });
  if (schedule === null) throw new VideoError(RESULT_FAIL, 'Mission does not exist');
  // Execute the task
  this.service.scheduleService[schedule.jobHandler]();
}
Copy the code

Management system page implementation

UI implementation is relatively simple and won’t be explained

// src/api/task.js
import request from '@/utils/request'
/** * Scheduled task list *@param {*} params* /
export function scheduleList(params) {
  return request({
    url: '/task/schedule/list'.method: 'GET',
    params
  })
}
/** * Modify/add scheduled task *@param {*} data* /
export function editSchedule(data) {
  return request({
    url: '/task/schedule/edit'.method: 'post',
    data
  })
}
/** * Delete a scheduled task *@param {*} data* /
export function deleteSchedule(data) {
  return request({
    url: '/task/schedule/delete'.method: 'post',
    data
  })
}
/** * Updates the scheduled task status *@param {*} data* /
export function updateStatusSchedule(data) {
  return request({
    url: '/task/schedule/status/update'.method: 'post',
    data
  })
}
/** * Execute the task *@param {*} data* /
export function runSchedule(data) {
  return request({
    url: '/task/schedule/run'.method: 'post',
    data
  })
}
Copy the code
// src/views/task/schedule.vue <template> <div class="app-container"> <div class="filter-container"> <el-button v-waves Class ="filter-item" type="primary" icon="el-icon-plus" @click="handleEdit(null)"> New </el-button> </div> <el-table v-loading="listLoading" :data="list" border fit highlight-current-row style="width: <el-table-column align="center" prop="job_id" label=" task ID" /> <el-table-column align="center" prop="jobName" <el-table-column align="center" prop="cron" /> <el-table-column align="center" /> <el-table-column align="center" prop="params" label=" parameter "/> <el-table-column Align ="center" prop="remark" label=" task description "/> <el-table-column align="center" prop="status" label=" status" > <template slot-scope="{row}"> <el-tag v-if="row.status==0" type="success">run</el-tag> <el-tag v-else type="info">stop</el-tag> </template> </el-table-column> <el-table-column align="center" label=" operation "> <template slot-scope="{row}"> <el-button v-if="row.status==-1" type="text" @click="updateStatus(row.job_id, </el-button> <el-button v-else type="text" @click="updateStatus(row.job_id, </el-button> <el-button type="text" @click="run(row.job_id)"> </el-button> <el-button type="text" @click="handleEdit(row)"> edit </el-button> </el-button type="text" @click="del(row)"> </el-table-column> </el-table> <pagination v-show="total>0" :total="total" :page.sync="listQuery.page" :limit.sync="listQuery.size" @pagination="getList" /> <el-dialog :visible.sync="dialogVisible" :title="dialogType==='edit'? <el-form ref="editForm" :rules="rules" :model="fromData" label-width="100px" <el-form-item label="Cron" prop=" Cron" > < EL-input V-model ="fromData. Cron" placeholder=" /> </el-form-item> <el-form-item label=" placeholder "prop=" placeholder "> <el-input V-model ="fromData. Placeholder =" placeholder" /> </el-form-item> <el-form-item label="jobHandler" prop="jobHandler"> <el-input v-model="fromData.jobHandler" Placeholder =" please input jobHandler" /> </el-form-item> <el-form-item label=" parameter "prop="params"> <el-input V-model =" fromdata. params" type="textarea" placeholder=" /> </el-form-item> <el-form-item label=" task description" Prop ="remark"> <el-input V-model ="fromData. Remark "type="textarea" placeholder=" <div style="text-align:right;" </el-button> <el-button type="primary" @click="confirm"> Commit </el-button> </div> </el-dialog> </div> </template> <script> import Pagination from '@/components/Pagination' import waves from '@/directive/waves' import { scheduleList, editSchedule, deleteSchedule, updateStatusSchedule, runSchedule } from '@/api/task' export default { components: { Pagination }, directives: { waves }, data() { return { listLoading: false, list: [], total: 0, listQuery: { page: 1, size: 20 }, dialogVisible: False, dialogType: 'new', fromData: {}, rules: {cron: {required: true, message: 'Please enter cron ', trigger: 'blur'}, jobName: {required: true, message: 'Please enter the task name ', trigger: 'blur'}, jobHandler: {required: true, message: {this.getList()}, methods: { async getList() { this.listLoading = true const { code, data } = await scheduleList(this.listQuery) this.listLoading = false if (code === 0) { this.list = data.list this.total = data.total } }, handleEdit(row) { this.fromData = {} if (row) { this.fromData = JSON.parse(JSON.stringify(row)) this.dialogType = 'edit'  } else { this.dialogType = 'new' } this.dialogVisible = true }, async confirm() { this.$refs.editForm.validate(async valid => { if (! valid) return false const { code } = await editSchedule(this.fromData) if (code === 0) { this.$message({ message: This. dialogType === 'edit'? 'success'}) this.dialogVisible = false this.getList()}})}, del(row) {this.$confirm(' Do you want to delete this task? ', 'confirmButtonText ', {confirmButtonText:' confirm ', cancelButtonText: 'cancel ', type: 'warning' }).then(async() => { const { code } = await deleteSchedule({ job_id: Job_id}) if (code === 0) {this.$message({message: 'delete successfully ', type: 'success' }) this.getList() } }) }, async updateStatus(job_id, status) { const { code } = await updateStatusSchedule({ job_id, status }) if (code === 0) { this.$message({ message: 'Edit succeeded ', type: 'success' }) this.getList() } }, async run(job_id) { const { code } = await runSchedule({ job_id }) if (code === 0) { this.$message({ message: 'execute successfully ', type: 'success'})}}}} </script>Copy the code

The project address

Front-end source: admin-web

Server source: admin-server

Preview address: admin-demo