sequence
This article focuses on xxl-job-executor-Go
Executor
Type Executor interface {// Initialize Init(... // Log query LogHandler(Handler LogHandler) // Register task RegTask(pattern string, Task TaskFunc) // RunTask(writer http.ResponseWriter, Request *http. request) // KillTask KillTask(writer http.ResponseWriter, TaskLog(writer http.responsewriter, request * http.request) // Run() error}Copy the code
Executor defines Init, LogHandler, RegTask, RunTask, KillTask, TaskLog, and Run methods
executor
Type executor struct {opts Options address string regList *taskList runList *taskList // taskList is being executed mu RWMutex log Logger logHandler logHandler}Copy the code
Executor defines the opts, Address, regList, runList, MU, log, and logHandler attributes
Init
func (e *executor) Init(opts ... Option) { for _, o := range opts { o(&e.opts) } e.log = e.opts.l e.regList = &taskList{ data: make(map[string]*Task), } e.runList = &taskList{ data: make(map[string]*Task), } e.address = e.opts.ExecutorIp + ":" + e.opts.ExecutorPort go e.registry() }Copy the code
The Init method iterates over the opTS application opt, initializes regList, runList, address, and async e.registry().
RegTask
RegTask(pattern string, task TaskFunc) {var t = &task {} t.n = task e.list.set (pattern,) {// Register task func (e *executor) RegTask(pattern string, task TaskFunc) {var t = &task {} t.n = task e.list.set (pattern, t) return }Copy the code
The RegTask method adds a task of the specified pattern to the regList
runTask
func (e *executor) runTask(writer http.ResponseWriter, request *http.Request) { e.mu.Lock() defer e.mu.Unlock() req, _ := ioutil.ReadAll(request.Body) param := &RunReq{} err := json.Unmarshal(req, ¶m) if err ! = nil { _, _ = writer.Write(returnCall(param, 500, Error(" Error :" + string(req)) return} e.log.Info(" task parameter :%v", param) if! e.regList.Exists(param.ExecutorHandler) { _, _ = writer.Write(returnCall(param, 500, "Task not registered")) e.log.Error(" Task [" + Int64ToStr(param.jobid) + "] not registered :" + param.jobid) return} / / blocking strategy to deal with the if "e.r unList. The Exists (Int64ToStr (param. JobID)) {if param. ExecutorBlockStrategy = = coverEarly {/ / cover oldTask before dispatching := e.runList.Get(Int64ToStr(param.JobID)) if oldTask ! = nil {oldtask.cancel () e.runlist.del (Int64ToStr(oldtask.id))}} else { _ = writer.Write(returnCall(param, 500, ExecutorHandler (" ExecutorHandler "+ ExecutorHandler) return}} cxt := context.Background() task := e.regList.Get(param.ExecutorHandler) if param.ExecutorTimeout > 0 { task.Ext, task.Cancel = context.WithTimeout(cxt, time.Duration(param.ExecutorTimeout)*time.Second) } else { task.Ext, task.Cancel = context.WithCancel(cxt) } task.Id = param.JobID task.Name = param.ExecutorHandler task.Param = param task.log = e.log e.runList.Set(Int64ToStr(task.Id), task) go task.Run(func(code int64, msg string) { e.callback(task, Code, MSG)}) e.log.info (" task [" + Int64ToStr(param.jobid) + "] start :" + param.jobid) _, _ = writer.Write(returnGeneral()) }Copy the code
The runTask method determines whether the task is already registered, and performs different tasks according to the ExecutorBlockStrategy. If the task is coverEarly, the existing task is cancelled. Finally, the task is executed asynchronously through task.run
killTask
func (e *executor) killTask(writer http.ResponseWriter, request *http.Request) { e.mu.Lock() defer e.mu.Unlock() req, _ := ioutil.ReadAll(request.Body) param := &killReq{} _ = json.Unmarshal(req, ¶m) if ! e.runList.Exists(Int64ToStr(param.JobID)) { _, _ = writer.Write(returnKill(param, 500)) e.log.Error(" task [" + Int64ToStr(param.jobid) + "] not running ") return} task := e.runlist.get (Int64ToStr(param.jobid)) task := e.runlist.get (Int64ToStr(param.jobid)) task := e.runlist.get (Int64ToStr(param.jobid)) task := e.runlist.get (Int64ToStr(param.jobid)) task.Cancel() e.runList.Del(Int64ToStr(param.JobID)) _, _ = writer.Write(returnGeneral()) }Copy the code
The killTask method executes task.cancel () and removes it from the runList
taskLog
func (e *executor) taskLog(writer http.ResponseWriter, request *http.Request) { var res *LogRes data, err := ioutil.ReadAll(request.Body) req := &LogReq{} if err ! Error(" Log request failed :" + err.error ()) reqErrLogHandler(writer, req, err) return} err = json.Unmarshal(data, &req) if err ! Error(" Log request parsing failed :" + err.error ()) reqErrLogHandler(writer, req, err) return} e.log.info (" log request parameters :%+v", req) if e.logHandler ! = nil { res = e.logHandler(req) } else { res = defaultLogHandler(req) } str, _ := json.Marshal(res) _, _ = writer.Write(str) }Copy the code
The taskLog method retrieves logs using e.LogHandler (req) or defaultLogHandler(reQ)
summary
The executor of xxl-job-executor-go defines Init, LogHandler, RegTask, RunTask, KillTask, TaskLog, and Run methods. Executor implements the Executor interface and provides an HTTP API.
doc
- xxl-job-executor-go