preface
In the learning process, IT is found that Redis zSET can also be used to realize lightweight delayed message queue function. Although the reliability needs to be improved, it can be fully realized for some functional requirements that do not require high data reliability. The main use of Redis in zset zadd, ZrangebyScore and Zdel to achieve a small demo.
Prepare to install Redis, Redis-Go in advance
Because it’s macOS, straight
$ brew install redis
$ go get github.com/garyburd/redigo/redis
Copy the code
Because I am lazy, I directly used objectId in Bson to generate the unique ID of the task.
$ go get gopkg.in/mgo.v2/bson
Copy the code
It is not necessary to have a unique ID, but if you need to carry it later, you can easily find the corresponding task.
producers
10W tasks are generated through a for loop, each with a different time
func producer(a) {
count := 0
// Generate 100000 tasks
for count < 100000 {
count++
dealTime := int64(rand.Intn(5)) + time.Now().Unix()
uuid := bson.NewObjectId().Hex()
redis.Client.AddJob(&job.JobMessage{
Id: uuid,
DealTime: dealTime,
}, + int64(dealTime))
}
}
Copy the code
The AddJob function, in a separate package, takes the randomly generated time from the previous function as the timestamp to be processed.
// Add a task
func (client *RedisClient) AddJob(msg *job.JobMessage, dealTime int64) {
conn := client.Get()
defer conn.Close()
key := "JOB_MESSAGE_QUEUE"
conn.Do("zadd", key, dealTime, util.JsonEncode(msg))
}
Copy the code
consumers
The consumer processing process is divided into two steps:
- Gets a task that is less than or equal to the current timestamp
- Determine who has the current task by deleting the current task
Because when you get a task whose timestamp is less than or equal to the current timestamp, multiple Go Routine tasks may read the current task at the same time, but only one task can process the current task. Therefore, we need a scheme to determine who should handle the task (of course, if only one consumer can read it) : In this case, it can be obtained through the delete operation of Redis, because only the successful operation when deleting the specified value will return a non-0 value. Therefore, we can think that the Go routine that successfully deletes the current queue gets the current task.
Here’s the code:
/ / consumer
func consumer(a) {
// Start the go routine
count := 0
for count < 10 {
go func(a) {
for {
jobs := redis.Client.GetJob()
if len(jobs) <= 0 {
time.Sleep(time.Second * 1)
continue
}
currentJob := jobs[0]
// If the current redis queue succeeds,
if redis.Client.DelJob(currentJob) > 0 {
var jobMessage job.JobMessage
util.JsonDecode(currentJob, &jobMessage) // A custom JSON parsing function
handleMessage(&jobMessage)
}
}
}()
count++
}
}
// Use functions to process tasks
func handleMessage(msg *job.JobMessage) {
fmt.Printf("deal job: %s, require time: %d \n", msg.Id, msg.DealTime)
go func(a) {
countChan <- true(1)}}Copy the code
Redis section of the code, get the task and delete the task
// Get the task
func (client *RedisClient) GetJob(a) []string {
conn := client.Get()
defer conn.Close()
key := "JOB_MESSAGE_QUEUE"
timeNow := time.Now().Unix()
ret, err := redis.Strings(conn.Do("zrangebyscore", key, 0, timeNow, "limit".0.1))
iferr ! =nil {
panic(err)
}
return ret
}
// Delete the current task to determine whether the current task is stolen
func (client *RedisClient) DelJob(value string) int {
conn := client.Get()
defer conn.Close()
key := "JOB_MESSAGE_QUEUE"
ret, err := redis.Int(conn.Do("zrem", key, value))
iferr ! =nil {
panic(err)
}
return ret
}
Copy the code
The code is basically like this. At the end of the run, I was able to handle about 1W tasks every 3-4 seconds, which is really fast…