sequence

This article mainly studies the Machinery TaskProcessor

TaskProcessor

// TaskProcessor - can process a delivered task
// This will probably always be a worker instance
type TaskProcessor interface {
	Process(signature *tasks.Signature) error
	CustomQueue() string
	PreConsumeHandler() bool
}
Copy the code

The TaskProcessor interface defines the Process, CustomQueue, and PreConsumeHandler methods

Worker

// Worker represents a single worker process type Worker struct { server *Server ConsumerTag string Concurrency int Queue string errorHandler func(err error) preTaskHandler func(*tasks.Signature) postTaskHandler func(*tasks.Signature) preConsumeHandler func(*Worker) bool } // CustomQueue returns Custom Queue of the running worker process func (worker *Worker) CustomQueue() string { return worker.Queue } // Process handles received tasks and triggers success/error callbacks func (worker *Worker) Process(signature *tasks.Signature) error { // If the task is not registered with this worker, do not continue // but only return nil as we do not want to restart the worker process if ! worker.server.IsTaskRegistered(signature.Name) { return nil } taskFunc, err := worker.server.GetRegisteredTask(signature.Name) if err ! = nil { return nil } // Update task state to RECEIVED if err = worker.server.GetBackend().SetStateReceived(signature); err ! = nil { return fmt.Errorf("Set state to 'received' for task %s returned error: %s", signature.UUID, err) } // Prepare task for processing task, err := tasks.NewWithSignature(taskFunc, signature) // if this failed, it means the task is malformed, probably has invalid // signature, go directly to task failed without checking whether to retry if err ! = nil { worker.taskFailed(signature, err) return err } // try to extract trace span from headers and add it to the function context // so it can be used inside the function if it has context.Context as the first // argument. Start a new span if it isn't found. taskSpan := tracing.StartSpanFromHeaders(signature.Headers, signature.Name) tracing.AnnotateSpanWithSignatureInfo(taskSpan, signature) task.Context = opentracing.ContextWithSpan(task.Context, taskSpan) // Update task state to STARTED if err = worker.server.GetBackend().SetStateStarted(signature); err ! = nil { return fmt.Errorf("Set state to 'started' for task %s returned error: %s", signature.UUID, err) } //Run handler before the task is called if worker.preTaskHandler ! = nil { worker.preTaskHandler(signature) } //Defer run handler for the end of the task if worker.postTaskHandler ! = nil { defer worker.postTaskHandler(signature) } // Call the task results, err := task.Call() if err ! = nil { // If a tasks.ErrRetryTaskLater was returned from the task, // retry the task after specified duration retriableErr, ok := interface{}(err).(tasks.ErrRetryTaskLater) if ok { return worker.retryTaskIn(signature, retriableErr.RetryIn()) } // Otherwise, execute default retry logic based on signature.RetryCount // and signature.RetryTimeout values if signature.RetryCount >  0 { return worker.taskRetry(signature) } return worker.taskFailed(signature, err) } return worker.taskSucceeded(signature, results) } //SetPreConsumeHandler sets a custom handler for the end of a job func (worker *Worker) SetPreConsumeHandler(handler func(*Worker) bool) { worker.preConsumeHandler = handler }Copy the code

Worker TaskProcessor interface is achieved, the Process method by the Worker. The first server. Get taskFunc GetRegisteredTask, then through signature update state to RECEIVED, Set it to STARTED, execute task.call (), and update the task to failed or SUCCESS based on the result

summary

Machinery’s TaskProcessor interface defines the Process, CustomQueue, and PreConsumeHandler methods. Worker TaskProcessor interface is achieved, the Process method by the Worker. The first server. Get taskFunc GetRegisteredTask, then through signature update state to RECEIVED, Set it to STARTED, execute task.call (), and update the task to failed or SUCCESS based on the result.

doc

  • machinery