preface
In daily development, in addition to front-end development interface, we also need to write some timing processing tasks, such as daily timing not all users push messages. A mature scheduled task scheduling center can manage the information of all tasks through the management system, and can dynamically change the execution time and immediate execution of tasks.
Recently, the company’s business needs a scheduled task scheduling center system. However, the company cannot find a system developed by Node through searching the whole network. Therefore, it implements a scheduled task scheduling center system by itself.
Functions that need to be implemented
- Add, delete, change and check tasks
- Immediate execution of tasks
- Start/close a task
- Service restart Automatically loads scheduled tasks
Database design
CREATE TABLE `schedule_job` (
`job_id` int(11) NOT NULL AUTO_INCREMENT,
`cron` varchar(50) NOT NULL DEFAULT ' ' COMMENT 'Cron expression',
`jobName` varchar(100) NOT NULL DEFAULT ' ' COMMENT 'Task name',
`jobHandler` varchar(100) NOT NULL DEFAULT ' ' COMMENT 'Task handling method',
`params` varchar(255) NOT NULL COMMENT 'parameters',
`description` varchar(255) NOT NULL DEFAULT ' ' COMMENT 'description',
`status` int(1) NOT NULL DEFAULT '1' COMMENT 'State 0 enabled -1 stopped',
`create_by` varchar(100) NOT NULL COMMENT 'Founder',
`update_by` varchar(100) NOT NULL COMMENT 'Update person',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Creation time',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Update Time'.PRIMARY KEY (`job_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Scheduled Task List';
Copy the code
Add, delete, change and check tasks
// app/routers/task.js
'use strict';
module.exports = app= > {
const { router, controller, config, middleware } = app;
const checkTokenHandler = middleware.checkTokenHandler();
// List of scheduled tasks
router.get(`${config.contextPath}/task/schedule/list`, checkTokenHandler, controller.task.scheduleList);
// Modify/add a scheduled task
router.post(`${config.contextPath}/task/schedule/edit`, checkTokenHandler, controller.task.editSchedule);
// Delete a scheduled task
router.post(`${config.contextPath}/task/schedule/delete`, checkTokenHandler, controller.task.deleteSchedule);
// Updates the scheduled task status
router.post(`${config.contextPath}/task/schedule/status/update`, checkTokenHandler, controller.task.updateStatusSchedule);
};
// app/controller/task.js
'use strict';
const Controller = require('egg').Controller;
const { setResult } = require('.. /utils');
class TaskController extends Controller {
/** * Scheduled task management */
async scheduleList() {
const { ctx } = this;
const result = await ctx.service.taskService.scheduleList(ctx.request.query);
ctx.body = setResult({ data: result });
}
/** * Modify/Add scheduled task */
async editSchedule() {
const { ctx } = this;
const { username } = ctx.request.headers;
await ctx.service.taskService.editSchedule(username, ctx.request.body);
ctx.body = setResult();
}
/** * Delete the scheduled task */
async deleteSchedule() {
const { ctx } = this;
await ctx.service.taskService.deleteSchedule(ctx.request.body);
ctx.body = setResult();
}
/** * Updates the scheduled task status */
async updateStatusSchedule() {
const { ctx } = this;
awaitctx.service.taskService.updateStatusSchedule(ctx.request.body); ctx.body = setResult(); }}module.exports = TaskController;
// app/service/taskService.js
'use strict';
const { Service } = require('egg');
const { SCHEDULE_STATUS } = require('.. /constants');
class TaskService extends Service {
// Manage scheduled tasks
async scheduleList({ page = 1, size = 20 }) {
const limit = parseInt(size),
offset = parseInt(page - 1) * parseInt(size);
const [ list, total ] = await Promise.all([
this.app.mysql.select('schedule_job', {
orders: [[ 'create_time'.'desc' ]],
limit,
offset,
}),
this.app.mysql.count('schedule_job'),]);return { list, total };
}
// Modify/add a scheduled task
async editSchedule(userName, { job_id, cron, jobName, jobHandler, params = ' ', description = ' ' }) {
if(! job_id) {/ / new
await this.app.mysql.insert('schedule_job', {
cron,
jobName,
jobHandler,
description,
params,
create_by: userName,
update_by: userName,
create_time: new Date(),
update_time: new Date()});return;
}
/ / modify
await this.app.mysql.update('schedule_job', {
cron,
jobName,
jobHandler,
description,
params,
update_by: userName,
update_time: new Date()}, {where: { job_id } });
}
// Delete a scheduled task
async deleteSchedule({ job_id }) {
const result = await this.app.mysql.delete('schedule_job', { job_id });
if (result.affectedRows === 1) {
const schedule = await this.app.mysql.get('schedule_job', { job_id });
if (schedule.status === SCHEDULE_STATUS.RUN) {
// Stop the task
await this.ctx.helper.cancelSchedule(schedule.jobName); }}}// Updates the scheduled task status
async updateStatusSchedule({ job_id, status }) {
await this.app.mysql.update('schedule_job', { status }, { where: { job_id } }); }}module.exports = TaskService;
Copy the code
Implement the scheduled task start, cancel and all tasks
Node-schedule is a flexible cron-class and non-Cron-class job scheduler for Node.js. It allows you to schedule execution on a specific date using optional repetition rules. It uses only one timer at any given time (rather than reevaluating upcoming jobs per second/minute), providing ways to manage tasks such as start and stop.
// app/extend/helper.js
'use strict';
const schedule = require('node-schedule');
/** * a stack for scheduled tasks */
const scheduleStacks = {};
module.exports = {
/** * gets all tasks currently executing */
async getScheduleStacks() {
return scheduleStacks;
},
/** * Create a scheduled task *@param {*} Id Task ID *@param {*} cron Cron
* @param {*} JobName Task name *@param {*} The jobHandler task method * in daily use, there may be the same handler has different processing logic, so you need to pass in the task ID * for example: In message push, different contents will be pushed to the same users at different times, and the contents are stored in the task information. The business code needs to query the corresponding task information to read the push information and process the next logical */
async generateSchedule(id, cron, jobName, jobHandler) {
this.ctx.logger.info('create scheduled task', task ID: %s, cron: %s, task name: %s, task method: %s', id, cron, jobName, jobHandler);
scheduleStacks[jobName] = schedule.scheduleJob(cron, () = > {
this.service.scheduleService[jobHandler](id);
});
},
/** * Cancel/stop the scheduled task *@param {*} JobName Task name */
async cancelSchedule(jobName) {
this.ctx.logger.info('[Cancel scheduled task], task name: %s', jobName); scheduleStacks[jobName] && scheduleStacks[jobName].cancel(); }};Copy the code
The specific handler of a task
ScheduleService stores all task processing programs. Currently, it only manages a small number of tasks. If the tasks are huge, methods of different services can be called according to different task types.
At present, only one-time execution is implemented, without considering the failure, anomaly and other phenomena of the task, which will be improved later when there is time
// app/service/scheduleService.js
'use strict';
const { Service } = require("egg");
class ScheduleService extends Service {
/**
* 测试处理程序
*/
async testHandler(job_id) {
// Read a lock to ensure that only one process can execute a task at the same time
const locked = await this.app.redlock.lock('sendAllUserBroadcast:' + job_id, 'sendAllUserBroadcast'.180);
if(! locked)return false;
const schedule = await this.app.mysql.get('schedule_job', { job_id });
// Replace this with a specific business code
await this.logger.info('I am a test task, task info: %j', schedule);
/ / releases the lock
await this.app.redlock.unlock('sendAllUserBroadcast:'+ job_id); }}module.exports = ScheduleService;
Copy the code
Service restart Automatically loads scheduled tasks
// app.js
'use strict';
const { SCHEDULE_STATUS } = require('./app/constants');
class AppBootHook {
constructor(app) {
this.app = app;
this.ctx = app.createAnonymousContext();
}
async willReady() {
await this.app.logger.info('[Initializing scheduled tasks] Start... ');
// Query the scheduled task
const schedules = await this.app.mysql.select('schedule_job', { where: { status: SCHEDULE_STATUS.RUN } });
// Periodically registers scheduled tasks
schedules.forEach(async schedule => {
await this.ctx.helper.generateSchedule(schedule.job_id, schedule.cron, schedule.jobName, schedule.jobHandler);
});
await this.app.logger.info('[Initializing a scheduled task] Initialize a scheduled task: %d, end... ', schedules.length);
}
async beforeClose() {
await this.app.logger.info('[Destroy scheduled Task] Start... ');
const scheduleStacks = await this.ctx.helper.getScheduleStacks();
Reflect.ownKeys(scheduleStacks).forEach(async key => {
await this.ctx.helper.cancelSchedule(key);
});
await this.app.logger.info('[Destroy Scheduled Task] Number of destroyed scheduled tasks: %d, end... '.Reflect.ownKeys(scheduleStacks).length); }}module.exports = AppBootHook;
Copy the code
Improve the management of tasks
// app/routers/task.js.// Execute the task
router.post(`${config.contextPath}/task/schedule/run`, checkTokenHandler, controller.task.runSchedule); .// app/controller/task.js
/** * Perform the task */
async runSchedule() {
const { ctx } = this;
await ctx.service.taskService.runSchedule(ctx.request.body);
ctx.body = setResult();
}
// app/service/taskService.js
// Modify/add a scheduled task
async editSchedule(userName, { job_id, cron, jobName, jobHandler, params = ' ', description = ' ' }) {
if (result.affectedRows === 1) {
const schedule = await this.app.mysql.get('schedule_job', { job_id });
// Optional chain operator '? 'can be used here if version permits. `
if (schedule && schedule.status === SCHEDULE_STATUS.RUN) {
// Reset the task in start state
await this.ctx.helper.cancelSchedule(jobName);
await this.ctx.helper.generateSchedule(job_id, cron, jobName, jobHandler); }}}// Updates the scheduled task status
async updateStatusSchedule({ job_id, status }) {
const result = await this.app.mysql.update('schedule_job', { status }, { where: { job_id } });
// Check whether the update is successful
if (result.affectedRows === 1) {
const schedule = await this.app.mysql.get('schedule_job', { job_id });
if (status === SCHEDULE_STATUS.RUN) {
// Start the task
await this.ctx.helper.generateSchedule(job_id, schedule.cron, schedule.jobName, schedule.jobHandler);
} else {
// Stop the task
await this.ctx.helper.cancelSchedule(schedule.jobName); }}}// Execute the task
async runSchedule({ job_id }) {
const schedule = await this.app.mysql.get('schedule_job', { job_id });
if (schedule === null) throw new VideoError(RESULT_FAIL, 'Mission does not exist');
// Execute the task
this.service.scheduleService[schedule.jobHandler]();
}
Copy the code
Management system page implementation
UI implementation is relatively simple and won’t be explained
// src/api/task.js
import request from '@/utils/request'
/** * Scheduled task list *@param {*} params* /
export function scheduleList(params) {
return request({
url: '/task/schedule/list'.method: 'GET',
params
})
}
/** * Modify/add scheduled task *@param {*} data* /
export function editSchedule(data) {
return request({
url: '/task/schedule/edit'.method: 'post',
data
})
}
/** * Delete a scheduled task *@param {*} data* /
export function deleteSchedule(data) {
return request({
url: '/task/schedule/delete'.method: 'post',
data
})
}
/** * Updates the scheduled task status *@param {*} data* /
export function updateStatusSchedule(data) {
return request({
url: '/task/schedule/status/update'.method: 'post',
data
})
}
/** * Execute the task *@param {*} data* /
export function runSchedule(data) {
return request({
url: '/task/schedule/run'.method: 'post',
data
})
}
Copy the code
// src/views/task/schedule.vue <template> <div class="app-container"> <div class="filter-container"> <el-button v-waves Class ="filter-item" type="primary" icon="el-icon-plus" @click="handleEdit(null)"> New </el-button> </div> <el-table v-loading="listLoading" :data="list" border fit highlight-current-row style="width: <el-table-column align="center" prop="job_id" label=" task ID" /> <el-table-column align="center" prop="jobName" <el-table-column align="center" prop="cron" /> <el-table-column align="center" /> <el-table-column align="center" prop="params" label=" parameter "/> <el-table-column Align ="center" prop="remark" label=" task description "/> <el-table-column align="center" prop="status" label=" status" > <template slot-scope="{row}"> <el-tag v-if="row.status==0" type="success">run</el-tag> <el-tag v-else type="info">stop</el-tag> </template> </el-table-column> <el-table-column align="center" label=" operation "> <template slot-scope="{row}"> <el-button v-if="row.status==-1" type="text" @click="updateStatus(row.job_id, </el-button> <el-button v-else type="text" @click="updateStatus(row.job_id, </el-button> <el-button type="text" @click="run(row.job_id)"> </el-button> <el-button type="text" @click="handleEdit(row)"> edit </el-button> </el-button type="text" @click="del(row)"> </el-table-column> </el-table> <pagination v-show="total>0" :total="total" :page.sync="listQuery.page" :limit.sync="listQuery.size" @pagination="getList" /> <el-dialog :visible.sync="dialogVisible" :title="dialogType==='edit'? <el-form ref="editForm" :rules="rules" :model="fromData" label-width="100px" <el-form-item label="Cron" prop=" Cron" > < EL-input V-model ="fromData. Cron" placeholder=" /> </el-form-item> <el-form-item label=" placeholder "prop=" placeholder "> <el-input V-model ="fromData. Placeholder =" placeholder" /> </el-form-item> <el-form-item label="jobHandler" prop="jobHandler"> <el-input v-model="fromData.jobHandler" Placeholder =" please input jobHandler" /> </el-form-item> <el-form-item label=" parameter "prop="params"> <el-input V-model =" fromdata. params" type="textarea" placeholder=" /> </el-form-item> <el-form-item label=" task description" Prop ="remark"> <el-input V-model ="fromData. Remark "type="textarea" placeholder=" <div style="text-align:right;" </el-button> <el-button type="primary" @click="confirm"> Commit </el-button> </div> </el-dialog> </div> </template> <script> import Pagination from '@/components/Pagination' import waves from '@/directive/waves' import { scheduleList, editSchedule, deleteSchedule, updateStatusSchedule, runSchedule } from '@/api/task' export default { components: { Pagination }, directives: { waves }, data() { return { listLoading: false, list: [], total: 0, listQuery: { page: 1, size: 20 }, dialogVisible: False, dialogType: 'new', fromData: {}, rules: {cron: {required: true, message: 'Please enter cron ', trigger: 'blur'}, jobName: {required: true, message: 'Please enter the task name ', trigger: 'blur'}, jobHandler: {required: true, message: {this.getList()}, methods: { async getList() { this.listLoading = true const { code, data } = await scheduleList(this.listQuery) this.listLoading = false if (code === 0) { this.list = data.list this.total = data.total } }, handleEdit(row) { this.fromData = {} if (row) { this.fromData = JSON.parse(JSON.stringify(row)) this.dialogType = 'edit' } else { this.dialogType = 'new' } this.dialogVisible = true }, async confirm() { this.$refs.editForm.validate(async valid => { if (! valid) return false const { code } = await editSchedule(this.fromData) if (code === 0) { this.$message({ message: This. dialogType === 'edit'? 'success'}) this.dialogVisible = false this.getList()}})}, del(row) {this.$confirm(' Do you want to delete this task? ', 'confirmButtonText ', {confirmButtonText:' confirm ', cancelButtonText: 'cancel ', type: 'warning' }).then(async() => { const { code } = await deleteSchedule({ job_id: Job_id}) if (code === 0) {this.$message({message: 'delete successfully ', type: 'success' }) this.getList() } }) }, async updateStatus(job_id, status) { const { code } = await updateStatusSchedule({ job_id, status }) if (code === 0) { this.$message({ message: 'Edit succeeded ', type: 'success' }) this.getList() } }, async run(job_id) { const { code } = await runSchedule({ job_id }) if (code === 0) { this.$message({ message: 'execute successfully ', type: 'success'})}}}} </script>Copy the code
The project address
Front-end source: admin-web
Server source: admin-server
Preview address: admin-demo