preface
The introduction of Streams data structures in Redis 5 is arguably the biggest feature in this iteration. It gives Redis better and more powerful native support for its use as a message queue in the 5.x iteration, especially for persistent message queues. At the same time, Stream borrows the concept and design of Kafka’s consumer group model to make consumer message processing more efficient and fast. This paper analyzes the commonly used apis in Streams data structures.
To prepare
Redis version 5.0.5 is used for this article. With earlier 5.x versions, some of the apis work slightly differently than described in this article.
Add a message
The Streams add data is added using the XADD directive, and the data in the message is manipulated as k-V key-value pairs. A message can have multiple key-value pairs, add command format:
XADD key ID field string [field string ...]
Copy the code
Key is the Streams name, ID is the unique identifier of the message, and field String is the key-value pair. Let’s add the flow named Person to do this.
XADD person * name ytao des https://ytao.top
Copy the code
In the example above, the ID is copied with *, which means that the server automatically generates the ID and returns the data “1578238486193-0” after adding the ID.
[zipzipgex] [zipzipgex] [zipzipgex] [zipzipgex] [zipzipgex] [zipzipgex]
- MillisecondsTime is the millisecondsTime stamp of the current server time.
- SequenceNumber Indicates the sequence in which messages are generated within the current millisecond. By default, the sequence starts from 0 and increases by 1.
For example: 1578238486193-3 indicates the fourth message added at the time stamp of 1578238486193 milliseconds.
In addition to the automatic Id generation mode on the server, the specified Id can also be generated. However, the specified Id has the following restrictions:
- The front and back parts of the Id must be digits.
- The minimum Id is 0-1 and cannot be 0-0, but 2-0, 3-0…. It’s allowed.
- For added messages, the first half of the Id cannot be smaller than the value that has the maximum Id, and the second half of the Id cannot be smaller than the second half of the value that has the same maximum Id.
Otherwise, an exception will be thrown if the above conditions are not met:
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
Copy the code
In fact, when you add a message, you do two things. First, determine if Streams does not exist, create a name for Streams, and then add a message to Streams. Even when messages are added, the current Streams name can exist in Redis due to Id exceptions. The Id in Streams can also be used as a pointer because it is an ordered tag.
In production, if you use add messages this way, the problem is that when the number of messages is too large, the service will go down. The design of Streams had this in mind when it was possible to specify Streams’ capacity. If the capacity operates on this set value, the old message will be swapped. When adding messages, set the MAXLEN parameter.
XADD person MAXLEN 5 * name ytao des https://ytao.top
Copy the code
This specifies a capacity of five messages in the Streams. XTRIM can also be used to truncate messages from small to large to remove unwanted messages:
XTRIM person MAXLEN 8
Copy the code
Number of messages
View the number of messages using the XLEN command.
XLEN key
Copy the code
Example: Check the number of messages in the Person stream:
> XLEN person
(integer5)Copy the code
Query message
Query messages in Streams using the XRANGE and XREVRANGE directives.
XRANGE
When querying data, you can query data according to the specified Id range. XRANGE query command format:
XRANGE key start end [COUNT count]
Copy the code
Parameter Description:
- Key is the name of Streams
- Start indicates the start Id of the range query, including the current Id.
- Start indicates the end RANGE query Id, including the current Id.
- Count is the maximum number of messages returned by the query. This parameter is optional.
Here start and end have two unspecified values – and +, which represent infinitesimal and infinity respectively, so when used, all messages are queried.
> XRANGE person - +
1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
3) 1) 2-0 ""
2) 1) "name"
2) "gaga"
3) "des"
4) "fishion!"
Copy the code
The above query message data, as you can see, is queried in first-in, first-out order.
Use COUNT to specify the number returned by the query:
# query all messages and return one data
> XRANGE person - + COUNT 1
1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
Copy the code
In range queries, the second half of the Id can be omitted and all data in the second half will be queried.
XREVRANGE
XREVRANGE queries are similar to those used in the XRANGE directive, but the order of the start and end arguments is reversed:
XREVRANGE key end start [COUNT count]
Copy the code
Use cases:
> XREVRANGE person + -
1) 1) 2-0 ""
2) 1) "name"
2) "gaga"
3) "des"
4) "fishion!"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
3) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
Copy the code
The results of the query are in reverse order from XRANGE, and all else being equal, the two instructions return messages in ascending and descending order.
Delete the message
Deleting messages is done using the XDEL directive, which simply specifies the Streams name and Id to be deleted, allowing multiple messages to be deleted at once.
XDEL key ID [ID ...]
Copy the code
Deleted Cases:
# query all messages
> XRANGE person - +
1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
3) 1) 2-0 ""
2) 1) "name"
2) "gaga"
3) "des"
4) "fishion!"
# delete message
> XDEL person 2-0
(integer1)Delete all messages from the database
> XRANGE person - +
1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
Select * from * where ()
> XLEN person
(integer2)Copy the code
As you can see above, the length of the message is reduced by a corresponding amount when deleted.
News consumption
In Redis PUB/SUB, we consume messages through subscriptions, and in Streams data structures, we can block and wait when there are no new messages. Not only individual consumption is supported, but also group consumption is supported.
Individual consumption
Individual consumption uses the XREAD directive. As you can see, STREAMS, key, and ID are mandatory in the following command. ID indicates that messages greater than this ID will be read. When the ID value is given with $, it represents the maximum ID value of an existing message.
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]Copy the code
The COUNT argument above is used to specify the maximum number of reads, as is the case with XRANGE.
> XREAD COUNT 1 STREAMS person 0
1) 1) "person"
2) 1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
> XREAD COUNT 2 STREAMS person 0
1) 1) "person"
2) 1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
Copy the code
There is also a BLOCK argument inside XREAD that blocks subscription messages. The BLOCK takes the BLOCK time in milliseconds and releases the BLOCK if no new messages are consumed within that time. When the time here is set to 0, it blocks until new messages are consumed.
Window 1 blocks and waits for a new message
> XREAD BLOCK 0 STREAMS person $
Add a new message to connect window 2
> XADD person 2-2 name tao des coder
"2-2"
# window 1, gets a new message to consume with a blocking time
> XREAD BLOCK 0 STREAMS person $
1) 1) "person"
2) 1) 1) "2-2"
2) 1) "name"
2) "tao"
3) "des"
4) "coder"(60.81 s)Copy the code
When sequential consumption is performed using XREAD, an additional Id of the read location is recorded for further consumption.
Consumer group
The main purpose of group consumption is to distribute messages to different clients and process messages at a more efficient rate. To meet this liver function requirement, we need to do three things: create a group, the group reads the message, and acknowledge the message to the server for processing.
Group operating
Operation group using XGROUP instruction:
XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
Copy the code
In the command above, the operations include:
- CREATE Creates a consumer group.
- SETID Changes the Id of the next message to be processed.
- DESTROY Destroys the consumer group.
- DELCONSUMER Deletes the specified consumer in the consumer group.
We currently need to create a consumer group:
# start consumption with the current maximum Id
> XGROUP CREATE person group1 $
OK
Copy the code
Group read message
Group reading uses the XREADGROUP directive. COUNT and BLOCK use operations similar to XREAD, but with additional group and consumer specifications:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]Copy the code
Since group consumption is similar to individual consumption, there is only a blocking analysis, and there is also a special value > for Id, indicating that the message has not been consumed:
# Window 1, Taotao consumer establishes blocking listen in consumer group
XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >
# Window 2, in the consumer group, yangyang consumer establishes blocking monitor
XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >
# Window 3, add the consumption message
> XADD person 3-1 name tony des 666
"3-1"
# window 1 reads a new message, and window 2 does not respond
> XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >
1) 1) "person"
2) 1) 1) "3-1"
2) 1) "name"
2) "tony"
3) "des"
4) "666"(77.54 s)# Window 3, add the consumption message again
> XADD person 3-2 name james des abc!
"3-2"
# window 2 reads a new message, and window 1 does not respond
> XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >
1) 1) "person"
2) 1) 1) "3-2"
2) 1) "name"
2) "james"
3) "des"
4) "abc!"(76.36 s)Copy the code
In the above execution process, there are two consumers in the group1 group. When two messages are added, the two consumers take turns to consume.
An ACK message
After the message has been consumed, this is the flag that needs to be sent to the server to ensure that the message has been consumed in order to avoid repeated consumption. For example, we have consumed the last two messages, but when we read the messages again, they are still read:
> XREADGROUP GROUP group1 yangyang STREAMS person 0
1) 1) "person"
2) 1) 1) "3-2"
2) 1) "name"
2) "james"
3) "des"
4) "abc!"
Copy the code
At this point, we use the XACK directive to tell the server that we have processed the message:
XACK key group ID [ID ...] 0Copy the code
Have the server mark 3-2 processed:
> XACK person group1 3-2
(integer1)Copy the code
Get group read message again:
> XREADGROUP GROUP group1 yangyang STREAMS person 0
1) 1) "person"
2) (empty list or set)
Copy the code
There are no readable messages in the queue. In addition to the API explained above, viewing consumer group information can be viewed using the XINFO command, which is not analyzed in this article.
conclusion
In our analysis of Streams’ commonly used APIS, we can see that Redis is getting stronger and stronger in support of message queues. If you’ve ever used its PUB/SUB feature, you’ll know that the 5.x iteration is just a refinement of some of your pain points.
Personal blog: Ytao.top
Pay attention to the public number [Ytao], more original good articles