One, foreword

Publishing order system is a common function used in daily development. Simply put, the publisher publishes the message, and the subscriber receives the message and processes it accordingly, as shown in the figure below.

Publish/subscribe

Redis provides us with PubSub, a publish/subscribe function module that can be used for messaging.

Publisher and subscriber are both Redis clients, and channel is redis server.

The Publisher sends a message to the channel, and the subscriber subscribing to the channel receives the message.

2.1 Common Commands

2.1.1 Subscribe channel

127.0.0.1:6379> subscribe test1 test2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "test1"
3) (integer) 1
1) "subscribe"
2) "test2"
3) (integer) 2
Copy the code

Publish

127.0.0.1:6379> publish test1 hello
(integer) 1
127.0.0.1:6379> publish test2 world
(integer) 1
Copy the code

Clients that subscribe to Test1 and Test2 receive messages

1) "message"
2) "test1"
3) "hello"
1) "message"
2) "test2"
3) "world"
Copy the code

2.1.2 Subscription mode PSUBSCRIBE

According to the above method, if a subscriber wants to subscribe to multiple channels, it needs to specify the names of multiple channels at the same time. To solve this problem, Redis provides the subscription mode of PSUBSCRIBE pattern matching, which can match channels through wildcard.

127.subscribe ch* Reading messages 127.subscribe CH * Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "ch*" 3) (integer) 1Copy the code

news

127.0.0.1:6379> publish cha hello
(integer) 1
127.0.0.1:6379> publish china world
(integer) 1
Copy the code

Clients that previously subscribed to CH * will receive messages from cha channels and China channels, thus subscribing to multiple channels at once

1) "pmessage"
2) "ch*"
3) "cha"
4) "hello"
1) "pmessage"
2) "ch*"
3) "china"
4) "world"
Copy the code

2.2 Implementation Principle

The Redis server stores a list of clients that subscribe to channels/patterns

struct redisServer { ... dict *pubsub_channels; List *pubsub_patterns; // Redis server process maintains client information about subscribed channels, key is channel, value is client list *pubsub_patterns; // The pattern maintained by the redis server process; . };Copy the code

So if a client subscribing to a channel, then pubsub_Channels on the server stores a piece of data. Pubsub_channels is actually a linked list, with key for channel, value for client list, depending on which channel the key subscribing to, You can find all the clients that subscribe to the channel.

Also, if a client subscribes to a pattern, pubsub_Patterns adds a new piece of data that records the current pattern that the client subscribes to. Pubsub_patterns also has its own data structure that contains both the client and the pattern.

typedef struct pubsubPattern { client *client; // client robj *pattern; } pubsubPattern;Copy the code

When a publisher publishes a message to a channel, it iterates through pubsub_Channels to find a list of clients that subscribe to that channel and sends messages to those clients in turn.

It then iterates through pubsub_patterns to find the pattern that matches the current channel, and finds the client that corresponds to the pattern, and sends a message to the client.

Third, the Stream

Although Redis provides publish/subscribe functionality, it is not perfect, so there are few suitable scenarios to use it.

PubSub faults:

  • Subscribers can have duplicate messages if they deploy multiple nodes.
  • Without ACK, messages are easily lost. If a consumer goes down while subscribing to a message, he or she will no longer be able to receive messages posted during that time when he or she reconnects.
  • Messages do not persist. Once the Redis server goes down, all messages are lost.

After the emergence of Redis5.0, the Stream data structure appeared, and finally improved the message mechanism of Redis.

A Stream is actually a list of messages, but it implements almost all of the functionality required for a message queue, including:

  • Serialization generation of the message ID
  • Message traversal
  • Blocking and non-blocking reads of messages
  • Group consumption of messages
  • Processing of unfinished messages
  • Message queue monitoring

At the same time, it should be noted that Stream is only a data structure, it will not actively push news to consumers, consumers need to take the initiative to consume data.

Each Stream has a unique name, the Redis key, which is automatically created the first time a message is appended using the XAdd directive.

The following table lists the common operation commands:

Name of the command The command format describe
xadd xadd key id<*> field1 value1 Append the specified message to the specified queue (key). * indicates automatic id generation (current time + sequence number)
xread xread [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …] Read from message queue, COUNT: number of messages read, BLOCK: BLOCK reads (default is not blocked), key: queue name, ID: message ID (start ID)
xrange xrange key start end [COUNT] Read messages in a queue with a given ID range. COUNT: Number of messages returned (message ID from small to large)
xrevrange xrevrange key start end [COUNT] Read messages in a queue with a given ID range. COUNT: Number of messages returned (message ID from largest to smallest)
xdel xdel key id Delete queue message
xgroup create xgroup create key groupname id Create a new consumer group
xgroup destroy xgroup destroy key groupname Example Delete a specified consumer group
xgroup delconsumer xgroup delconsumer key groupname cname Deletes a specified consumer from a specified consumer group
xgroup setid xgroup setid key id Changes the maximum ID of the specified message
xreadgroup xreadgroup group groupname consumer COUNT streams key Consumer Consumer group data (created if consumer does not exist)

3.1 Example

3.1.1 Adding and Reading Messages

// Add message queue name: mq data: Score =100 127.0.0.1:6379> xadd mq * score 100 "1627225715999-0" 127.0.0.1:6379> xadd mq * score 80 "1627225761166-0" // Xread COUNT 1 STREAMS MQ 0 1) 1) "MQ" 2) 1) 1) 1) "1627225715999-0" 2) 1) "score" 2) "100" // 127.0.0.1:6379> xrange mq - + 1) 1) "1627225715999-0" 2) 1) "score" 2) "100" 2) 1) "1627225761166-0" 2) 1) Xrange mq 1627225761166-0 1627225761166-0 1) 1) "1627225761166-0" 2) 1) "score" 2) "80"Copy the code

If the client wants to know which piece of data it has consumed, it needs to record the message ID of the current consumption, and the next time it consumes the message ID of the last consumption, it can start reading data from the last consumption.

3.2 consumer groups

A last_delivered_id cursor is added to the consumer group to indicate which piece of data is currently consumed. At the same time, all data is the message to be processed (PEL). Only after the consumer completes processing, the data will be removed from the PEL by notifying the Redis server with ack instruction. The confirmed message cannot be consumed again.

127.0.0.1:6379> xrange mq - + 1) 1) "1627225715999-0" 2) 1) "score" 2) "100" 2) 1) "1627226969615-0" MqGroup 127.0.0.1:6379> xgroup create mq mqGroup 0 OK Select * from 'last_delivered_id' where 'delivered_id' = 'last_delivered_id'; 127.0.0.1:6379> xreadGroup GROUP mqGroup Consumer1 COUNT 1 STREAMS MQ > 1) 1) "MQ" 2) 1) "1627225715999-0" 2) 1) "score" 2) "100" 127.0.0.1:6379> xreadGroup GROUP mqGroup Consumer1 COUNT 1 STREAMS MQ > 1) 1) "Mq" 2) 1) 1) "1627226969615-0" 2) 1) "Score" 2) "70" 127.0.0.1:6379> xReadGroup GROUP mqGroup Consumer1 COUNT 1 STREAMS 127.0.0.1:6379> xinfo groups mq 1) 1) "name" 2) "mqGroup" // Consumer group name 3) "consumers" 4) (INTEGER) 1 // Number of consumers 5) "pending" 6) (INTEGER) 2 // Number of data to be processed, if only the data was read but was not told to Redis, 7) "last-delivered-id" 8) "1627226969615-0" 127.0.0.1:6379> xack mq mqGroup 1627226969615-0 (INTEGER) 1 // Check the status 127.0.0.1:6379> xinfo groups MQ 1) 1) "Name" 2) "mqGroup" 3) "consumers" 4) (INTEGER) 1 5) "pending" 6) (INTEGER) 1 // Number of pending messages changed from 2 to 1 7) "last-delivered- ID" 8) "1627226969615-0" // Confirmed message, 127.0.0.1:6379> xreadGroup GROUP mqGroup Consumer1 COUNT 1 STREAMS MQ 1627225715999-0 1) 1) "mq" 2) (Empty List Or set) 127.0.0.1:6379> xreadGroup GROUP mqGroup Consumer1 COUNT 1 STREAMS MQ 0 1) 1) "MQ" 2) 1) 1) "1627225715999-0" 2) 1) "score" 2) "100"Copy the code

3.2 Message Queue Is Too Long

If a large number of messages are received, to avoid the Stream being too long, you can specify the maximum length of the Stream. Once the maximum length is reached, the Stream will be cleared from the earliest messages to ensure the latest messages in the Stream.