An asynchronous job execution system based on Redis
Installation
Get the package
$ go get github.com/wang502/gores/gores
Copy the code
Import the package
import "github.com/wang502/gores/gores"Copy the code
Configuration
Add a config.json in your project folder
{
"REDISURL": "127.0.0.1:6379"."REDIS_PW": "mypassword"."BLPOP_MAX_BLOCK_TIME" : 1."MAX_WORKERS": 2."Queues": ["queue1"."queue2"]}Copy the code
- REDISURL: Redis server address. If you run in a local Redis, the dafault host is
127.0.0.1:6379
- REDIS_PW: Redis password. If the password is not set, then password can be any string.
- BLPOP_MAX_BLOCK_TIME: Blocking time when calling BLPOP command in Redis.
- MAX_WORKERS: Maximum number of concurrent workers, each worker is a separate goroutine that execute specific task on the fetched item.
- Queues: Array of queue names on Redis message broker
An item is a Go map. It is required to have several keys:
- Name, name of the item to enqueue, items with different names are mapped to different tasks.
- Queue, name of the queue you want to put the item in.
- Args, the required arguments that you need in order for the workers to execute those tasks.
- Enqueue_timestamp, the timestamp of when the item is enqueued, which is a Unix timestamp.
configPath : = flag.String("c"."config.json"."path to configuration file")
flag.Parse(a)config.err : = gores.InitConfig(*configPath)
resq : = gores.NewResQ(config)
item : = map[string]interface{} {"Name": "Rectangle"."Queue": "TestJob"."Args": map[string]interface{} {"Length": 10."Width": 10,},"Enqueue_timestamp": time.Now().Unix(),
}
err = resq.Enqueue(item)
iferr ! =nil {
log.Fatalf("ERROR Enqueue item to ResQ")}Copy the code
$ go run main.go -c ./config.json -o produce
Copy the code
Define tasks
package tasks
// task for item with 'Name' = 'Rectangle'
// calculating the area of an rectangle by multiplying Length with Width
func CalculateArea(args map[string]interface{}) error {
var err error
length : = args["Length"]
width : = args["Width"]
if length == nil || width == nil {
err = errors.New("Map has no required attributes")
return err
}
fmt.Printf("The area is %d\n".int(length.(float64)) * int(width.(float64)))
return err
}Copy the code
flag.Parse(a)config.err : = gores.InitConfig(*configPath)
tasks : = map[string]interface{} {"Item": tasks.PrintItem."Rectangle": tasks.CalculateArea,
}
gores.Launch(config, &tasks)Copy the code
$ go run main.go -c ./config.json -o consume
Copy the code
The output will be:
The rectangle area is 100
Copy the code
Info about processed/failed job
resq : = gores.NewResQ(config)
if resq == nil {
log.Fatalf("resq is nil")}info : = resq.Info(a)for k.v : = range info {
switch v.(type) {
case string:
fmt.Printf("%s : %s\n", k, v)
case int:
fmt.Printf("%s : %d\n", k, v)
case int64:
fmt.Printf("%s : %d\n", k, v)
}
}Copy the code
The output will be:
Queue: queues: 2 workers: 0 Failed: 0 host: 127.0.0.1:6379 Pending: 0 Processed: 1
Copy the code