Small knowledge, big challenge! This paper is participating in theEssentials for programmers”Creative activities.

1, the introduction of

Stream makes up the deficiency of Redis as MQ (Message Queue) technology selection; Redis 5.0 Stream supports message persistence compared to Pub/Sub, combining sentinel or Cluster to make it a more reliable message queue. Although I don’t think it will be the technology model for corporate MQ, it is worth exploring the use and characteristics of Stream (consumer group).

Stream benchmarks message queues and thus has almost all the features of MQ. Some of the features of Stream are listed below:

  • Message sequential storage
  • Message ID serialization rule generation
  • Traversal of the message
  • Message blocking/non-blocking fetch
  • Clients consume messages in groups
  • Message acknowledgement mechanism
  • Message exception mechanism
  • Message queue monitoring

These features of Stream will also be covered in this article.

2. Stream internal exploration

2.1 the Stream structure

Before exploring the internal structure of a Stream, take a look at a clear diagram of the Stream structure:Here’s what the above picture means:

  • Message Content: indicates the Message Content
  • Consumer group: A Consumer group created by using the XGROUP CREATE command. A Consumer group can have multiple consumers
  • Last_delivered_id: cursor. Each consumer group has a cursor that moves forward when any consumer reads the message
  • A Consumer in a group
  • Pending_ids: State variables. Each consumer has a state variable that records the id of the message read by the current consumer but not ack

2.2 Four Unique

A linked list of messages is maintained inside the Stream to enable messages to behave like queues. There are four unique things to know about Stream:

  1. Each Stream has a unique name
  2. Each Message has a unique ID, either assigned by the system or specified by the client
  3. The consumer group (Consumer_Group) in each Stream has a unique name
  4. The Consumer in each Consumer_Group has a unique name

2.3 the message ID

The Stream message ID can be automatically generated by the server or passed in by the client. Rules automatically generated by the system

[zipzipgex], [zipzipgex], [zipzipgex], [zipzipgex], [zipzipgex], [zipzipgex], [zipzipgex], [zipzipgex], [zipzipgex], [zipzipgex], [zipzipgex], [zipzipgex], [zipzipgex], [zipzipgex] SequenceNumber is a 64-bit sequenceNumber that increases from 0 under the same millisecondsTime.

The client display rule Redis has a mandatory ID in the format of **-**, the minimum ID is 0-1, and the subsequent ID cannot be smaller than the previous ID

2.4 Message Content

The Message Content of a Stream is a Hash structure that exists as a key-value.

3. The Stream directive

3.1 Instruction Summary

The Stream instructions can be divided into two categories, namely message queue related instructions and consumption group related instructions. Message queue related directives:

Instruction names Directive role
XADD Add the message to the end of the queue
XTRIM Limits the length of the Stream and intercepts it if it is too long
XDEL Delete the message
XLEN Gets the length of the message in Stream
XRANGE Gets a list of messages (ranges can be specified), ignoring deleted messages
XREVRANGE The difference with XRANGE is the reverse fetch, with ids from large to small
XREAD Gets a message (blocking/non-blocking) that returns a message greater than the specified ID

Consumer group related instructions:

Instruction names Directive role
XGROUP CREATE Create consumer Groups
XREADGROUP GROUP Read messages in consumer groups
XACK Ack messages that are marked “processed”
XGROUP SETID Sets the ID of the final message delivered by the consumer group
XGROUP DELCONSUMER Delete consumer Group
XPENDING Prints details of the message to be processed
XCLAIM Transfer the attribution of messages (long-term unprocessed/unprocessed messages are transferred to other consumer groups for processing)
XINFO Print details for Stream Consumer Group
XINFO GROUPS Print details of consumer groups
XINFO STREAM Print the Stream details

3.2 XADD

XADD is used to add messages to the Stream queue. If the specified Stream queue does not exist, a new Stream queue will be created.

XADD instruction syntax:

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value …]

Here are two ways to define an ID using XADD (see 2.3).

3.2 XTRIM

XTRIM is used to limit the length of the Stream.

XTRIM command syntax:

XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]

  • MAXLEN Specifies the maximum length that can be exceeded before the message in the queue is discarded
  • MINID Specifies the minimum number of ids allowed, starting with one id value and discarding the rest

3.3 XDEL

XDEL is used to delete messages.

XDEL’s command syntax:

XDEL key ID [ID …]

3.4 XLEN

XLEN is used to get the length of the message in the Stream queue.

XLEN instruction syntax:

XLEN key

3.5 XRANGE

XRANGE is used to get a list of messages (ranges can be specified), ignoring deleted messages.

XRANGE command syntax:

XRANGE key start end [COUNT count]

  • Start indicates the start value, and – indicates the minimum value
  • End indicates the end value and + indicates the maximum value
  • Count indicates how many values can be fetched

3.6 XREVRANGE

XREVRANGE is used to get a list of messages (ranges can be specified), ignoring deleted messages. The difference from XRANGE is that the elements of the message list are retrieved in the opposite direction, end before start.

XREVRANGE command syntax:

XREVRANGE key end start [COUNT count]

3.7 XREAD

XREAD is used to get messages (blocking/non-blocking) and only returns messages greater than the specified ID.

XREAD instruction syntax:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

  • COUNT Specifies the maximum number of messages to read
  • BLOCK reads messages as if they are blocked. By default, they are not blocked. If milliseconds is set to 0, it is always blocked

**$** stands for special ID, which means that the current Stream has the largest ID stored as the last ID. There is no message in the current Stream greater than the current maximum ID, so return nil.0-0If count is not specified, all messages in the Stream will be returned. Note that 0 (00/000 is also acceptable…) .

Blocking gets the instruction in Stream, which is shown here blocking to get a message

3.8 XGROUP CREATE

XGROUP CREATE is used to CREATE consumer groups.

XGROUP CREATE command syntax:

XGROUP [CREATE key groupname ID | [MKSTREAM]] [SETIDkeygroupnameID ∣ [MKSTREAM]] [SETID key groupname ID | [MKSTREAM]] [SETIDkeygroupnameID ∣] [DESTROY key groupname] [CREATECONSUMER key groupname consumername] [DELCONSUMER key groupname consumername]

There is nothing complicated about the XGROUP CREATE directive. The arguments in the first parentheses are the most important.

  • $indicates that consumption starts at the end of the Stream, ignoring any data currently in the Stream
  • 0 means consuming from the Stream header

The XGROUP CREATE syntax will report an error if the Stream does not exist, so it is not allowed to CREATE a consumer group on a Stream that does not exist

3.9 XREADGROUP GROUP

XREADGROUP GROUP is used to read messages in consumer groups.

XREADGROUP GROUP instruction syntax:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]

Note that the special symbol **>** indicates that the message has never been delivered to another consumer. 0 indicates that the message ID is specified, because the ID is greater than 0-0 (generation 0 means 0-0), and therefore represents that the message is fetched from the queue head of the Stream

In the screenshot below, why does mystream 0 return empty for the first time? After executing mystream >, the second mystream 0 successfully fetched the message, but it is clear that two messages were added to mystream and the first time should not fail? This is because when you specify an ID for message retrieval, the command will give us access to our history of pending messages that were retrieved, but not ack. That is, the set of messages passed to the named consumer, identified by the name provided, and so far never confirmed using XACK.

XREADGROUP groups can also block messages like XREAD Block read returns when a message is added to myStream

3.10 XACK

XACK is used to mark “processed.”

XACK instruction syntax:

XACK key group ID [ID …]

Test the XACK directive with the feature that the specified ID in **XREADGROUP GROUP ** can only get unack unprocessed messages. Two conclusions can be drawn from the following test example:

  • If the message is ack successfully for the first time, return 1; if ack fails, return 0
  • The conclusion in 3.9 is correct

3.11 XPENDING

XPENDING is used to print details of pending messages.

The XPENDING directive is useful because it prints information about pending messages. If there are multiple consumers in a consumer group, and if some consumers are permanently broken and unable to process messages, we can use the XPENDING directive to look for unack messages from consumers in the specified consumer group and transfer them to other consumers for processing.

XPENDING instruction syntax:

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

XPENDING return value resolution:

  1. The first parameter represents the total number of pending messages in the current consumer
  2. The second parameter represents the minimum ID of the message to be processed
  3. The third parameter represents the maximum ID of the message to be processed
  4. The fourth parameter represents the list of consumers and the number of unprocessed messages

3.11 XCLAIM

XCLAIM is used to transfer attribution of a message.

XCLAIM instruction syntax:

XCLAIM key group consumer min-idle-time ID [ID …] [IDLE ms] [TIME ms-unix-time] [RET

Instruction parameter parsing:

  • Key indicates the Stream name
  • Group indicates the name of the consumer group that needs to transfer attribution of the message
  • Consumer represents the name of the consumer that receives the message
  • Min-idle-time indicates the minimum idle time. The message transfer command takes effect only when the idle time of the subsequent message with the specified ID is greater than the specified idle time
  • ID [] Indicates the ID of the message to transfer attribution

In the example, attribution of the unprocessed message with ID 1631719560149-0 in consumer-1 is transferred to consumer-2:

3.13 XINFO

XINFO is used to print Stream Consumer Group details.

XINFO instruction syntax:

XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

The example prints a detailed message for the specified STREAM

4. The Stream optimized memory thing

There are two things to watch out for when using Stream, which can lead to memory consumption if not used properly.

  1. There are too many pending messages and the messages are not ack in time. Procedure
  2. Stream messages are persisted and deleted using XDEL

On the first point, there are too many pending messages and messages are not ack in time. The reason for the increase in memory is that the Stream maintains a PEL list for each consumer. The PEL list is used to store the ids of messages that have been processed but not ack in time. In the actual use process, we must timely ack the messages after processing, and periodically check whether there is information accumulation caused by unavailability of consumers. XPENDING is able to find pending messages from consumers because of PEL.

On the second point, using XDEL to delete messages from a Stream that are no longer needed increases memory because the Stream’s XDEL delete message instruction does not remove the message from memory, it simply marks the message and ignores it the next time through the XRANGE instruction. XADD uses MAXLEN to specify the length of the Stream queue. When messages exceed the length, the queue header message is cleared. (However, this process must be timely to avoid message loss.)

XADDkey [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value …]