Redis5.0 was suddenly released by the author recently, adding many new features. The biggest new feature of Redis5.0 is the addition of a Stream data structure, which is a powerful new multicast persistent message queue. The authors say that Redis Stream heavily borrowed from Kafka’s design.

The structure of Redis Stream is shown in the figure above. It has a linked list of messages that are joined together, with each message having a unique ID and corresponding content. The message is persistent and the content remains after Redis restarts.

Each Stream has a unique name, which is the Redis key, created automatically when we append the message the first time using the Xadd directive.

Each Stream can hold multiple consumer groups, and each consumer group has a cursor named last_delivered_id that moves forward on the Stream array to indicate which message has been consumed by the current consumer group. Each consumer group has a unique name within the Stream. The consumer group is not created automatically, but requires a separate directive, xgroup CREATE, to specify that the consumer starts from a message ID of the Stream that initializes the last_delivered_id variable.

The states of each Consumer Group are independent of each other. That is, messages within the same Stream will be consumed by each consumer group.

The same Consumer Group can be attached to multiple consumers in a competing relationship, and the cursor last_delivered_id moves forward when any Consumer reads the message. Each consumer has an intra-group unique name.

The Consumer has a status variable pending_IDS that records messages that have been read by the client but have not yet been ack. If the client does not ack, the number of message ids in this variable will increase and decrease as soon as a message is ack. The pending_ids variable, officially known as PEL (Pending Entries List) in Redis, is a core data structure that ensures that a client consumes a message at least once without losing it in mid-network transmission.

Message ID

The message ID is in the form of a Timestampinmillis-sequence, such as 1527846880572-5, which indicates that the current message was generated at millimeter timestamp 1527846880572 and is the fifth message generated within that millisecmetre. The message ID can be automatically generated by the server or specified by the client itself, but it must be an integer to an integer, and the ID of the subsequent message must be greater than the previous message ID.

The message content

The message content is key-value pairs, hash like key-value pairs, which is nothing special.

Add and delete

  1. Xadd appends messages
  2. Xdel deletes the message. This deletes only the flag bit and does not affect the total length of the message
  3. Xrange obtains the message list and automatically filters the deleted messages
  4. Xlen Message length
  5. Del deletes the Stream
# * indicates that the server automatically generates ids, followed by a bunch of keys/values
Laoqian, 30 years old
127.0.0.1:6379> xadd codehole * name laoqian age 30  
1527849609889-0  ID of the generated message
127.0.0.1:6379> xadd codehole * name xiaoyu age 29
1527849629172-0
127.0.0.1:6379> xadd codehole * name xiaoqian age 1
1527849637634-0
127.0.0.1:6379> xlen codehole
(integer) 3
# - indicates the minimum value and + indicates the maximum value127.0.0.1:6379> xrange codehole - + 127.0.0.1:6379> xrange codehole - + 1) 1) 1) 2) 1)"name"
      2) "laoqian"
      3) "age"
      4) "30"
2) 1) 1527849629172-0
   2) 1) "name"
      2) "xiaoyu"
      3) "age"
      4) "29"
3) 1) 1527849637634-0
   2) 1) "name"
      2) "xiaoqian"
      3) "age"
      4) "1"
# specify a list of minimum message ids127.0.0.1:6379> xrange codehole 1527849629172-0 + 1) 1) 2)"name"
      2) "xiaoyu"
      3) "age"
      4) "29"
2) 1) 1527849637634-0
   2) 1) "name"
      2) "xiaoqian"
      3) "age"
      4) "1"
# specify a list of maximum message ids127.0.0.1:6379> xrange codehole - 1527849629172-0 1) 1) 2) 1)"name"
      2) "laoqian"
      3) "age"
      4) "30"
2) 1) 1527849629172-0
   2) 1) "name"
      2) "xiaoyu"
      3) "age"
      4) "29"
127.0.0.1:6379> xdel codehole 1527849609889-0
(integer1)The length is not affected127.0.0.1:6379 > xlen codehole (integer) 3
# Deleted message is gone127.0.0.1:6379> xrange codehole - + 1) 1) 1) 2) 2)"name"
      2) "xiaoyu"
      3) "age"
      4) "29"
2) 1) 1527849637634-0
   2) 1) "name"
      2) "xiaoqian"
      3) "age"
      4) "1"
Delete the entire Stream127.0.0.1:6379 > del codehole (integer1)Copy the code

Independent consumption

We can consume Stream messages independently without defining consumer groups, and even block waiting when the Stream has no new messages. Redis designed a separate consumption directive, Xread, that uses Stream as a normal message queue (list). With Xread, we can completely ignore Consumer groups, just as a Stream is a list.

Read two messages from the Stream header127.0.0.1:6379> xread count 2 streams codehole 0-0 1)"codehole"2) 1) 1) 1) 2) 2)"name"
            2) "laoqian"
            3) "age"
            4) "30"
      2) 1) 1527851493405-0
         2) 1) "name"
            2) "yurui"
            3) "age"
            4) "29"
Read a message from the end of the Stream. No message is returned127.0.0.1:6379> xread count 1 streams codehole $(nil)# block from the tail until a new message arrives. The following instructions block until a new message arrives
127.0.0.1:6379> xread block 0 count 1 streams codehole $
# open a new window and fill the Stream with messages in this window
127.0.0.1:6379> xadd codehole * name youming age 60
1527852774092-0
Switching to the previous window, we can see that the block is unblocked and the new message content is returned
It also shows a wait time, where we wait 93s127.0.0.1:6379> xread block 0 count 1 streams codehole $1) 1)"codehole"2) 1) 1) 2) 1) 2) 2)"name"
            2) "youming"
            3) "age"
            4) "60"(93.11 s)Copy the code

If a client wants to use Xread for sequential consumption, it must remember where the current consumption went, namely the returned message ID. The next time you call Xread, pass in the last message ID returned last time as an argument and continue consuming subsequent messages.

Block 0 means block forever until a message arrives, block 1000 means block 1s, and if no message arrives for 1s, returns nil

127.0.0.1:6379> xread block 1000 count 1 streams codehole $(nil) (1.07s)Copy the code

Creating a consumer Group

xgroup create
last_delivered_id

# means to start from scratch
127.0.0.1:6379> xgroup create codehole cg1 0-0
OK
# $means consuming from the end, accepting only new messages and ignoring all current Stream messages127.0.0.1:6379> xgroup create codehole cg2 $OKGet Stream information
127.0.0.1:6379> xinfo codehole
 1) length
 2) (integer) 3  # 3 messages
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer2)# Two consumer groups
 9) first-entry  # First message
10) 1) 1527851486781-0
    2) 1) "name"
       2) "laoqian"
       3) "age"
       4) "30"
11) last-entry  # Last message
12) 1) 1527851498956-0
    2) 1) "name"
       2) "xiaoqian"
       3) "age"
       4) "1"
Get the Stream's consumer group information
127.0.0.1:6379> xinfo groups codehole
1) 1) name
   2) "cg1"
   3) consumers
   4) (integer) 0  This consumer group has no consumers yet
   5) pending
   6) (integer) 0  # This consumer group has no messages being processed
2) 1) name
   2) "cg2"
   3) consumers  This consumer group has no consumers yet(4)integer) 0
   5) pending
   6) (integer) 0  # This consumer group has no messages being processed
Copy the code

consumption

Stream provides the XreadGroup directive for in-group consumption of consumer groups by providing the consumer group name, consumer name, and start message ID. Like Xread, it can also block waiting for new messages. When a new message is read, the corresponding message ID is entered into the consumer’s PEL(message in process) structure. After processing, the client notifies the server with the XACK directive that the message has been processed and the message ID is removed from the PEL.

# > indicates that the value is read from the end of the current consumer group's last_delivered_id
# every time the consumer reads a message, the last_delivered_id variable advances
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
1) 1) "codehole"2) 1) 1) 1) 2) 2)"name"
            2) "laoqian"
            3) "age"
            4) "30"
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
1) 1) "codehole"2) 1) 1) 1) 2) 2) 2)"name"
            2) "yurui"
            3) "age"
            4) "29"
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 2 streams codehole >
1) 1) "codehole"2) 1) 1) 1) 2) 2) 2)"name"
            2) "xiaoqian"
            3) "age"
            4) "1"
      2) 1) 1527852774092-0
         2) 1) "name"
            2) "youming"
            3) "age"
            4) "60"
# read more, there is no new message
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
(nil)
# Then block and wait
127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole >
Open another window and fill it with messages
127.0.0.1:6379> xadd codehole * name lanying age 61
1527854062442-0
# return to the previous window and find that the block is unblocked and a new message is received
127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole >
1) 1) "codehole"2) 1) 1) 2) 1) 2) 2)"name"
            2) "lanying"
            3) "age"
            4) "61"(36.54 s)# Observe consumer group information
127.0.0.1:6379> xinfo groups codehole
1) 1) name
   2) "cg1"
   3) consumers
   4) (integer1)# a consumer
   5) pending
   6) (integer5)There are 5 pieces of information being processed and no ACK
2) 1) name
   2) "cg2"
   3) consumers
   4) (integer) 0  There is no change in consumption group CG2 because we have been manipulating CG1
   5) pending
   6) (integer) 0
# If there are multiple consumers in the same consumer group, we can observe the state of each consumer with the Xinfo Consumers directive
127.0.0.1:6379> xinfo consumers codehole cg1  # There is currently 1 consumer
1) 1) name
   2) "c1"
   3) pending
   4) (integer5)There are five pending messages
   5) idle
   6) (integer) 418715  Ms has not read the message
# Next we ack a message
127.0.0.1:6379> xack codehole cg1 1527851486781-0
(integer) 1
127.0.0.1:6379> xinfo consumers codehole cg1
1) 1) name
   2) "c1"
   3) pending
   4) (integer4)The # becomes 5
   5) idle
   6) (integer) 668504
# ack all messages below127.0.0.1:6379> xack codehole cg1 1527851493405-0 1527851498956-0 1527852774092-0integer) 4
127.0.0.1:6379> xinfo consumers codehole cg1
1) 1) name
   2) "c1"
   3) pending
   4) (integer) 0  # the pel is empty
   5) idle
   6) (integer) 745505
Copy the code

What if there are too many Stream messages

The reader can easily imagine that if too many messages accumulate, the Stream list will be too long and the content will not explode. The xdel directive does not delete the message, it just flags the message.

Redis naturally takes this into account, so it provides a fixed-length Stream feature. The xadd directive provides a fixed length maxlen to kill old messages, making sure they are no longer than the specified length.

127.0.0.1:6379 > xlen codehole (integer) 5
127.0.0.1:6379> xadd codehole maxlen 3 * name xiaorui age 1
1527855160273-0
127.0.0.1:6379> xlen codehole
(integer) 3
Copy the code

We see the length of the Stream cut off.

What happens if a message forgets ACK

A Stream holds a list of in-process message ids (PEL) in each consumer structure. If a consumer receives an ACK that is finished processing a message, the PEL list will grow. If there are many consumer groups, the memory usage of this PEL will be magnified.

How does PEL avoid message loss

When the client consumer reads the Stream message and the Redis server replies the message to the client, the client suddenly disconnects and the message is lost. But PEL already stores the ID of the message that was sent. After the client is reconnected, it can receive the list of message ids in PEL again. In this case, the start message ID of xreadGroup cannot be parameter >, but must be any valid message ID. Generally, the parameter is set to 0-0, indicating that all PEL messages and new messages after last_delivered_id are read.

conclusion

Stream’s consumption model borrows from Kafka’s consumption grouping concept, which compensates for Redis Pub/Sub’s inability to persist messages. It is different from Kafka, where messages can be partitioned and streams cannot. Provide different Stream names and hash messages to select which Stream to plug into. If the reader has done a bit of research on Disque, the Redis author’s other open source project, it is likely that the author realized that Disque wasn’t active enough and ported Disque content to Redis. This is my guess, not necessarily the original intention of the author. If readers have different ideas, feel free to join the discussion in the comments section.

Read more advanced articles, follow the public account “code hole”