List
This is the most common way to queue, which is to use lpush & RPOP on a list, as shown below:
To introduce a for loop with a certain sleep time due to the empty queue, the pseudo-code is as follows:
for {
ifmsg:=redis.rpop(); msg! =nil{
handle(msg)
}else{
time.sleep(1000)}}Copy the code
There may be a risk of delayed processing in this scenario (although in most scenarios it has little impact)
However, Redis has block operator, which realizes blocking pull through BRPOP, and can obtain data in time. Pseudo-code is as follows:
for{# timeout is0, stands for infinite waitingif msg:=redis.brpop(0); msg! =nil{
handle(msg)
}
}
Copy the code
However, if the Redis client does not receive messages for a long time, the Redis server may judge the client as an invalid link and kick it offline. Therefore, when the messages are not very dense, If the value is directly set to 0, there is still some risk. It is recommended to keep the minimum wait value of non-0 (1s), and the pseudo-code is as follows:
for {
if msg:=redis.brpop(1000); msg! =nil{
handle(msg)
}
}
Copy the code
It not only ensures real-time performance, but also avoids link disconnection
This is the simplest possible queue, but note that this is not a reliable queue and is mainly a message loss problem: due to the lack of ACK mechanism, if the consumer goes down after pulling (RPOP) messages (or a new version comes online), the message is most likely missing
Second, we’re looking at some of the message queue features:
- Support for multiple groups of consumers: Since it is a simple list, it is pulled out and disappears, resulting in a message being consumed by a single consumer only once. In some consumer group scenarios, it is not satiable (multiple different businesses consuming the same consumer queue)
- Message playback: The Redis list does not support this feature when messages need to be rolled back (online bug tracking history data, tests, etc.)
conclusion
- Easy to implement, that’s why it’s so common, right
- There is no ACK mechanism and it is unreliable
- Multiple consumer groups are not supported
- Message playback is not supported
Pub/Sub
Publish & Subscribe redis is a publish & subscribe command designed to solve the publish/subscribe problem
#producers
127.0.0.1:6379> publish queue 1
(integer) 1
#Consumer 1
127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1
1) "message"
2) "queue"
3) "1"
#Consumer 2
127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1
1) "message"
2) "queue"
3) "1"
Copy the code
It mainly solves the problem of supporting multiple groups of consumers, as shown below:
However, pub/ SUB itself does not have the ability of persistence, but only real-time forwarding of data, which will cause the following problems:
- When a consumer goes offline and goes online again, the old messages are lost and the new messages can only be received because the previous messages are not stored
- When all consumers log off and no one buys anything, all the messages are lost
Therefore, this requires that consumers be online before producers, or they will lose messages
Second, there is a buffer inside each subscription, and since there is a limit to the buffer, redis will force the consumer offline once the limit is exceeded, resulting in message loss
Here we can see that the list-based queue is a pull model, while pub/sub is a push model, pushed to buffer and then waited for the consumer to fetch
conclusion
- Support publish/subscribe, support multiple groups of consumers
- Consumers lose data when they go offline
- Message pile-up can kick consumers off, causing data loss
- There is no ACK mechanism and it is unreliable
In general, it is very weak and meaningless, so don’t consider it
Stream
Note that this feature is only available in Redis 5.0
Stream uses XAdd and XreadGroup to produce and consume messages
#producers
127.0.0.1:6379> xadd queue * k1 v1
1636289446933-0
127.0.0.1:6379> xadd queue * k2 v2
1636295029388-0
127.0.0.1:6379> xadd queue * k3 v3
1636291571597-0
#Consumer 1127.0.0.1:6379> xreadGroup group g1 c1 COUNT 1 streams queue > 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 2) "V1" 2) 1) "1636289446933-0" 2) 1) "k1" 2) "v1" 127.0.0.1:6379 > xreadgroup group g1 c1 COUNT 1 streams queue > 1) 1) "queue" 2) 1) 1) "1636295029388-0" 2) 1) "k2" 2) "v2"#Consumer 2127.0.0.1:6379> xreadGroup group g1 C2 COUNT 1 streams queue > 1) 1) "queue" 2) 1) 1) 1) "1636291571597-0" 2) 1) "k3" 2) "v3"Copy the code
Xack and XreadGroup are used to reply and recover messages
#Manual ack
127.0.0.1:6379> xack queue g1 1636289446933-0
(integer) 1
#Query for messages that are not yet committed pending127.0.0.1:6379> xreadGroup group g1 c1 COUNT 1 streams queue 0) 1) "queue" 2) 1) 1) 1) 1) 1) 1) 1) 2) 1) 2) "v2"Copy the code
Most recovery consumption is implemented based on the following code
for{# bypassing (1, 1, 0) {# bypassing (1, 1, 0)if id=0{# unpushed messages start consuming id=">"
}
msg:=xreadgroup(lastUnAckID)
handle(msg)
xack(msg)
}
Copy the code
In the case that consumers remain unchanged (quantity, unique ID, etc.), the above is a very convenient scheme; However, if there is any change, it is necessary to introduce additional timing polling scheme, or ZooKeeper scheme to ensure consistency of consumers. It is more complicated, but it will be expanded here, and a special stream topic will be opened later
Contrast this with a professional message queue
We compare redis queues with professional queues such as rabbitMQ and Kafka.
- The message is not lost
- Message backlog costs are low
The message is not lost
This is a whole topic, need producer, consumer, middleware tripartite cooperation ability to achieve
1. The producer loses messages
- If no, try again
- Sent, but the return is lost: this can only be retried, but may result in downstream message duplication
Therefore, if the producer wants to avoid losing the message, he can only try again. At this time, the consumer should consider retry the message processing to achieve idempotency
From this point of view, the producer does not lose messages has nothing to do with the overall middleware, it is entirely a matter of business implementation, whether or not the above exception is considered
2. Information loss of consumers
The main reason is that consumers are down and there is no receipt after taking out
In this scenario, the middleware needs to provide an ACK mechanism to ensure that which messages have been consumed, so that messages are not lost
Redis stream has the same ack mechanism as Kafka and rabbitMQ
3. Lost messages
This is how middleware is implemented
According to @Kaito, Redis has two risks:
- Aof periodically flusher disks. This process is asynchronous and has the risk of loss
- Master/slave switch, the master library is not synchronized to the master library (there are doubts, not synchronized can also be referred to the master library 🤔️)
For kafka and rabbitMQ, multiple nodes ack at the same time to consider the write to be successful, which further enhances message reliability
Message backlog
- Redis: Memory based, limited storage space, messages are discarded after a certain amount of backlog
- Kafka, rabbitMQ: Hard disk based, considered unlimited storage space
conclusion
- Ack is supported on a pub/sub basis
- Support message playback
- When consumers change, additional encoding needs to be introduced to ensure the reliability of messages, which is complicated
- There is still a risk that messages will be lost due to component failures
Reference:
How does Redis do message queues?
Redis Stream features