sequence

This article focuses on xxl-job-executor-Go

Executor

Type Executor interface {// Initialize Init(... // Log query LogHandler(Handler LogHandler) // Register task RegTask(pattern string, Task TaskFunc) // RunTask(writer http.ResponseWriter, Request *http. request) // KillTask KillTask(writer http.ResponseWriter, TaskLog(writer http.responsewriter, request * http.request) // Run() error}Copy the code

Executor defines Init, LogHandler, RegTask, RunTask, KillTask, TaskLog, and Run methods

executor

Type executor struct {opts Options address string regList *taskList runList *taskList // taskList is being executed mu RWMutex log Logger logHandler logHandler}Copy the code

Executor defines the opts, Address, regList, runList, MU, log, and logHandler attributes

Init

func (e *executor) Init(opts ... Option) { for _, o := range opts { o(&e.opts) } e.log = e.opts.l e.regList = &taskList{ data: make(map[string]*Task), } e.runList = &taskList{ data: make(map[string]*Task), } e.address = e.opts.ExecutorIp + ":" + e.opts.ExecutorPort go e.registry() }Copy the code

The Init method iterates over the opTS application opt, initializes regList, runList, address, and async e.registry().

RegTask

RegTask(pattern string, task TaskFunc) {var t = &task {} t.n = task e.list.set (pattern,) {// Register task func (e *executor) RegTask(pattern string, task TaskFunc) {var t = &task {} t.n = task e.list.set (pattern, t) return }Copy the code

The RegTask method adds a task of the specified pattern to the regList

runTask

func (e *executor) runTask(writer http.ResponseWriter, request *http.Request) { e.mu.Lock() defer e.mu.Unlock() req, _ := ioutil.ReadAll(request.Body) param := &RunReq{} err := json.Unmarshal(req, &param) if err ! = nil { _, _ = writer.Write(returnCall(param, 500, Error(" Error :" + string(req)) return} e.log.Info(" task parameter :%v", param) if! e.regList.Exists(param.ExecutorHandler) { _, _ = writer.Write(returnCall(param, 500, "Task not registered")) e.log.Error(" Task [" + Int64ToStr(param.jobid) + "] not registered :" + param.jobid) return} / / blocking strategy to deal with the if "e.r unList. The Exists (Int64ToStr (param. JobID)) {if param. ExecutorBlockStrategy = = coverEarly {/ / cover oldTask before dispatching := e.runList.Get(Int64ToStr(param.JobID)) if oldTask ! = nil {oldtask.cancel () e.runlist.del (Int64ToStr(oldtask.id))}} else { _ = writer.Write(returnCall(param, 500, ExecutorHandler (" ExecutorHandler "+ ExecutorHandler) return}}  cxt := context.Background() task := e.regList.Get(param.ExecutorHandler) if param.ExecutorTimeout > 0 { task.Ext, task.Cancel = context.WithTimeout(cxt, time.Duration(param.ExecutorTimeout)*time.Second) } else { task.Ext, task.Cancel = context.WithCancel(cxt) } task.Id = param.JobID task.Name = param.ExecutorHandler task.Param = param task.log = e.log e.runList.Set(Int64ToStr(task.Id), task) go task.Run(func(code int64, msg string) { e.callback(task, Code, MSG)}) e.log.info (" task [" + Int64ToStr(param.jobid) + "] start :" + param.jobid) _, _ = writer.Write(returnGeneral()) }Copy the code

The runTask method determines whether the task is already registered, and performs different tasks according to the ExecutorBlockStrategy. If the task is coverEarly, the existing task is cancelled. Finally, the task is executed asynchronously through task.run

killTask

func (e *executor) killTask(writer http.ResponseWriter, request *http.Request) { e.mu.Lock() defer e.mu.Unlock() req, _ := ioutil.ReadAll(request.Body) param := &killReq{} _ = json.Unmarshal(req, &param) if ! e.runList.Exists(Int64ToStr(param.JobID)) { _, _ = writer.Write(returnKill(param, 500)) e.log.Error(" task [" + Int64ToStr(param.jobid) + "] not running ") return} task := e.runlist.get (Int64ToStr(param.jobid)) task := e.runlist.get (Int64ToStr(param.jobid)) task := e.runlist.get (Int64ToStr(param.jobid)) task := e.runlist.get (Int64ToStr(param.jobid)) task.Cancel() e.runList.Del(Int64ToStr(param.JobID)) _, _ = writer.Write(returnGeneral()) }Copy the code

The killTask method executes task.cancel () and removes it from the runList

taskLog

func (e *executor) taskLog(writer http.ResponseWriter, request *http.Request) { var res *LogRes data, err := ioutil.ReadAll(request.Body) req := &LogReq{} if err ! Error(" Log request failed :" + err.error ()) reqErrLogHandler(writer, req, err) return} err = json.Unmarshal(data, &req) if err ! Error(" Log request parsing failed :" + err.error ()) reqErrLogHandler(writer, req, err) return} e.log.info (" log request parameters :%+v", req) if e.logHandler ! = nil { res = e.logHandler(req) } else { res = defaultLogHandler(req) } str, _ := json.Marshal(res) _, _ = writer.Write(str) }Copy the code

The taskLog method retrieves logs using e.LogHandler (req) or defaultLogHandler(reQ)

summary

The executor of xxl-job-executor-go defines Init, LogHandler, RegTask, RunTask, KillTask, TaskLog, and Run methods. Executor implements the Executor interface and provides an HTTP API.

doc

  • xxl-job-executor-go