preface

Suppose we have a scenario where the system pushes a message to the user in a particular situation, perhaps SMS, email, or in-site message, because the message in this scenario may push a large number of messages at one time, and the main process does not want to block. This scenario has low requirements on real-time performance and allows messages to be delivered in a delayed manner. At the beginning of the system construction, it is not very convenient to use the professional message queue middleware Rabbitmq and Kafka to implement the asynchronous message push. At this time, we can consider using Redis to implement the simple message queue. Of course, if we have very high requirements for real-time and reliable messaging, we may need to use MQ or Kafka to achieve this.

The message queue

The realization principle is that producers push data into the List of Redis, and consumers rotate to pop data. If they can get the data, they directly consume the data. If they can’t wait, they continue to cycle pop data.

The red arrow in the diagram above is a simulated scenario where the producer adds data from the left and the consumer gets data from the right. And vice versa.

But here’s the problem: what if the queue is empty? When the list is empty, the consumer rotates to retrieve the data, but each time it fails to retrieve the data, it falls into an endless loop of no data, which not only increases the CPU of the client, but also increases the QPS of Redis, and these accesses are invalid.

At this time, we can delay by 1 second in the way of sleep(1), or we can use the blocking access, BRPP and BLPOP commands provided by Redis. Consumers can specify a blocking timeout time if the data does not exist when they cannot get the data. If they can get the data within this time, they will immediately return. Otherwise null is returned. When this timeout is set to 0, it will block forever, which is not generally recommended. If multiple clients are blocking for messages at the same time, they are sorted in order.

Redis client

Let’s go ahead and take a look at the Redis client.

import (
    "log"
    "github.com/go-redis/redis"
)
// The key of the queue
var queueKey = "queue:message"
var rdb *redis.Client

func NewRedis(a){ rdb = redis.NewClient(& redis.Options{ Addr:"127.0.0.1:6379",
        Password:   "",
    })
    pong, err := rdb.Ping().Result()
    iferr ! =nil {
        log.Fatalln(err)
    }
    log.Println(pong,"redis success.")}Copy the code

Producers Producer

Producer concrete implementation code, simulating a randomly generated message producer, using Lpush to add data to the list.

Func ProducerMessageList(){rate.seed (time.now ().unixnano ())) log.println (" open producer... ) for i := 0; i < 10; I++ {score: = time. Now (). The Unix () the Println (" is producing a message..." , score, i) _,err := rdb.LPush(queueListKey,i).Result() if err ! = nil { log.Println(err) } time.Sleep(time.Duration(rand.Intn(3)) * time.Second) } }Copy the code

Consumers Consumer

Consumers use rpop to pop up a message from the queue to spend, but we talked about before, if the queue is empty, will be constantly training in rotation pop news, can cause a lot of waste of resources, so we used here brpop command to realize the block read, blocking read when no data on the queue immediately enter a dormant state, once have the data, It wakes up immediately and pops up a message, with a negligible delay.

// Use the list format to consume the message func ConsumerMessageList() {for { Error := rdb.brpop (5 * time.second,"queue:list").result () if err == redis.Nil{ continue } if err ! Time. Sleep(1 * time.Second) continue} log.Println(" consume data: ", value, "current time is: ", time.Now().Unix()) time.Sleep(time.Second) } }Copy the code

As you can see from the code, we set a 5-second block read timeout, because the blocking read cannot be blocked all the time. A long block may be actively disconnected by the server, and then an exception will be thrown, so we need to set a not very long block timeout.

Delay queue

The realization principle is to use the zadd command to store the production data into Redis, and set the score of field as the timestamp that needs to be delayed. If it needs to be executed immediately and no delay is needed, set the score as the current timestamp. At the same time, run a separate Goroutine, use zrangebyScore command to intercept the data, the score of the data is the current timestamp, so that the data will be retrieved when it reaches the execution time, then we take the first data, use zrem command to remove the data from the queue, if the removal is successful, the message is allowed to be sent. Otherwise, perform the above process again.

Redis client

Let’s start by looking at the Redis client link implemented using Go-Redis.

Import ("log" "github.com/go-redis/redis") // Queue key var queueKey = "queue:message" var RDB * redis.client func NewRedis() { rdb = redis.NewClient(& Options{Addr: "127.0.0.1:6379", Password: "",}) pong, err := rdb.ping ().result () if err! = nil { log.Fatalln(err) } log.Println(pong,"redis success.") }Copy the code

Note: here in the demo environment, I use a var RDB *redis.Client, the following code use RDB directly to call Redis, the actual project according to the situation of the project.

Producers Producer

Here, I simulated a situation of random message generation. We used a random number to delay a period of time to observe whether consumers could consume data in real time.

Func ProducerMessage() {rand.seed (time.now ().unixnano ()) log.println (" start producer... ) for i := 0; i < 5; I++ {score: = time. Now (). The Unix () the Println (" is producing a message..." , score, I) rdb.ZAdd("queue:message", redis.Z{score: float64(score +1),// i, }) time.Sleep(time.Duration(rand.Intn(3)) * time.Second) } }Copy the code

The generated data will be stored in Redis by using the zadd command, and score will be added as the current timestamp, and a delay time will be added to score. Since the second-level timestamp used in the example is used as score, we can delay the time by 1 second directly by score+1. Member can store the data we want to delay, such as the message to be sent.

Consumers Consumer

If the message is delayed to the current time, it can be directly intercepted and removed by ZREM. The purpose is to prevent the same data from being consumed repeatedly. Only after the message is successfully removed can the subsequent consumption process be carried out.

Func ConsumerMessage() {log.println (" Starting consumer..." ) for { // score := time.Now().UnixNano() values, err := rdb.ZRangeByScore("queue:message", redis.ZRangeBy{ Min: "0", Max: fmt.Sprint(time.Now().Unix()), Offset: 0, Count: 1, }).Result() if err ! Time.sleep (time.second) continue} if len(values) == 0 {// No data, Sleep(time.second) continue} value := values[0] num, err := rdb.ZRem("queue:message", value).Result() if err ! = redis.Nil & & err ! Log.Println(err) time.Sleep(time.Second) continue} if num == 1 {log.Println(" consume data: ", value, "current time is: ", time.now ().unix ()) // Simulate a time-consuming operation time.sleep (2 * time.second)}}}Copy the code

Since we use the second-level timestamp as score, we can use the operation of sleep(1) to reduce repeated requests to Redis when we query no data.

The publisher subscriber model PubSub

Redis has a separate module for one-to-many one-way communication schemes. This is the PubSub module. The PubSub module can be used as a broadcast mode, i.e. one publisher with multiple subscribers.

The Subscriber mode can be understood as follows: the Subscriber will subscribe to a channel on Redis, and the producer will publish a message on the channel, and the Subscriber will receive the message immediately. If we have multiple subscribers, the message published by the publisher will be received by multiple subscribers at the same time with the same message.

We can implement a simple message queue based on this model, with one publisher and one subscriber to publish and subscribe messages.

The Redis client in the actual code is no longer listed as above.

Publisher (Producer)

Publishers use publish commands to publish data to channels, and subscribers can subscribe to messages

func ProducerMessagePubSub() { rand.Seed(time.Now().UnixNano()) for i := 0; i < 10; I++ {log.Println(" producing a message...") , i) r, err := rdb.Publish("queue:pubsub", i).Result() if err ! = nil { log.Println(err,r) } time.Sleep(time.Duration(rand.Intn(3)) * time.Second) } }Copy the code

Subscriber (consumer)

The subscriber subscribes to the specified channel, then uses the pipe to receive and process the message.

Func ConsumerMessagePubSub(node int){Subscribe := Subscribe("queue:pubsub") ch := pubsub.channel () For MSG := range ch {log.Printf(" current node :% d, consume data, channel:%s; message:%s\n", node, msg.Channel, msg.Payload) } }Copy the code

Of course, if we will publish a large number of messages and there will be multiple consumers to consume at the same time, channels can also be divided into multiple channels, each channel has its own subscribers to subscribe, and then the publisher will distribute the messages to each channel according to the node ID or random allocation.

However, it is important to note that in the example above, if two goroutines are enabled at the same time, the publisher will Publish the message immediately, and the subscriber will not subscribe to the message immediately. This is because it is almost instantaneous, as opposed to enabling two Goroutines with go. Therefore, subscribers must first subscribe to the channel, and then publish the operation.

Reference:

Redis Deep Adventures: Core principles and applied practices