The background,

A new data type, Stream, has been created in Redis 5. It has a similar design to Kafka and can be used as a simple message queue.

The characteristics of Stream type in Redis

  1. It is persistent and ensures that data is not lost.

  2. Support message multicast, group consumption.

  3. Support message ordering.

3. Structure of Stream

Explanation:

  1. Consumer Group:Consumer GroupXGROUP CREATECommand to create, a consumer group can have more than one consumer, among these consumers iscompetitionRelationship.
    1. The same message can only be retrieved by one consumer in the consumer group.
    2. Many consumers are independent of each other and do not interfere with each other.
  2. Consumer:Consumer news.
  3. Last_delivered_id:This ID ensures that a message can only be retrieved by one consumer within the same consumer group. Each time a consumer in the consumer group reads the message, the value of last_delivered_id moves one step back to ensure that the consumer does not read the same message.
  4. pending_ids: records a list of message ids read by the consumer, but these messages may not have been processed and need to be called if a message is considered processedackCommand. This ensures that a message must be executed once.
  5. Message content:Is aKey/value pairThe format.
  6. ID of message in Stream:By default, ID is used*, Redis can automatically generate one in the format ofTime stamp - Serial number, you can also specify the id. Generally, use the default id, and the generated ID must be larger than the previous one.

Stream command

XADD adds a message to the end of the Stream

1. Command format

xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
Copy the code

2, for example,

The xadd command returns the id of the data, XX-YY (xx refers to the number of milliseconds,yy refers to the number of messages in that millisecond)

1. Add a piece of data to the stream.

127.0.0.1:6379> xadd stream-key * username zhangsan "1635999858912-0" # return ID 127.0.0.1:6379> keys * 1) "stream-key"Copy the code

2. Adding data to a stream does not automatically create a stream

127.0.0.1:6379> xadd not-exists-stream nomkstream * username lisi 127.0.0.1:6379> keys * (empty array) 127.0.0.1:6379>Copy the code

3. Manually specify the ID value

127.0.0.1:6379> xadd stream-key 1-1 username lisi # where id = 1, id = 1, id = 1, id = 1, id = 1, id = 1Copy the code

4. Set a Stream of fixed size

1. Specify the exact size of the Stream

Specifying the size of a Stream takes slightly less performance than obfuscating the size of a Stream.

2. Vaguely specify the Stream size
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * first first "1636001034141-0" 127.0.0.1:6379> xadd stream-key maxlen ~ 1 * Second second "1636001044506-0" 127.0.0.1:6379> xadd stream-key maxlen ~ 1 * third third "1636001057846-0" 127.0.0.1:6379> xinfo stream stream-key 1) "length" 2) (integer) 3) "radix tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "last-generated-id" 8) "1636001057846-0" 9) "groups" 10) (integer) 0 11) "first-entry" 12) 1) "1636001034141-0" 2) 1) "first" 2) "first" 13) "last-entry" 14) 1) "1636001057846-0" 2) 1) "third" 2) "third" 127.0.0.1:6379 >Copy the code

~ obfuscates the size of the stream. You can see that it is 1, but it is actually 3.

XRANGE looks for messages in the Stream

1. Command format

xrange key start end [COUNT count]
Copy the code

2. Prepare data

127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> xadd stream-key * username zhangsan
QUEUED
127.0.0.1:6379(TX)> xadd stream-key * username lisi
QUEUED
127.0.0.1:6379(TX)> exec
1) "1636003481706-0"
2) "1636003481706-1"
127.0.0.1:6379> xadd stream-key * username wangwu
"1636003499055-0"
127.0.0.1:6379>
Copy the code

The transaction operation of Redis is used to obtain multiple data generated in the same millisecond with the same timestamp and different serial numbers

3, for example,

1. Get all the data (-and+The use of)

127.0.0.1:6379> xrange stream-key - +
1) 1) "1636003481706-0"
   2) 1) "username"
      2) "zhangsan"
2) 1) "1636003481706-1"
   2) 1) "username"
      2) "lisi"
3) 1) "1636003499055-0"
   2) 1) "username"
      2) "wangwu"
127.0.0.1:6379>
Copy the code

-: indicates the minimum ID

+: indicates the maximum ID

2. Obtain the data within the specified ID range and close the range

127.0.0.1:6379> xrange stream-key 1636003481706-1 1636003499055-0
1) 1) "1636003481706-1"
   2) 1) "username"
      2) "lisi"
2) 1) "1636003499055-0"
   2) 1) "username"
      2) "wangwu"
127.0.0.1:6379>
Copy the code

3. Obtain the data within the specified ID range and open the range

127.0.0.1:6379> xrange stream-key (1636003481706-0 (1636003499055-0 1) 1) "1636003481706-1" 2) 1) "username" 2) "lisi" 127.0.0.1:6379 >Copy the code

(: indicates the open range

4. Get all data after a certain millisecond

127.0.0.1:6379> xrange stream-key 1636003481706 + 1) 1) "1636003481706-0" 2) 1) "username" 2) "zhangsan" 2) 1) "1636003481706-1" 2) 1) "lisi" 3) 1) "1636003499055-0" 2) 1) "username" 2) "wangwu" 127.0.0.1:6379>Copy the code

Write milliseconds without the serial number.

5. Obtain a single piece of data

127.0.0.1:6379> xrange stream-key 1636003499055-0 1636003499055-0 1) 1) "1636003499055-0" 2) 1) "username" 2) "wangwu" 127.0.0.1:6379 >Copy the code

Start and end write the same value to get the single data.

6. Obtain a fixed number of data

127.0.0.1:6379> xrange stream-key - + count 1
1) 1) "1636003481706-0"
   2) 1) "username"
      2) "zhangsan"
127.0.0.1:6379>
Copy the code

Use count for restrictions

XREVRANGE reversely looks at the messages in the Stream

XREVRANGE key end start [COUNT count]
Copy the code

The usage is similar to that of XRANGE.

XDEL deletes the message

1. Command format

xdel key ID [ID ...]
Copy the code

2. Prepare data

127.0.0.1:6379> xadd stream-key * username zhangsan
"1636004176924-0"
127.0.0.1:6379> xadd stream-key * username lisi
"1636004183638-0"
127.0.0.1:6379> xadd stream-key * username wangwu
"1636004189211-0"
127.0.0.1:6379>
Copy the code

3, for example,

** Add three messages to Stream, then delete the second message

127.0.0.1:6379> xdel stream-key 1636004183638-0 (integer) 1 # Returns the number of deleted records 127.0.0.1:6379> xrang stream-key - + 127.0.0.1:6379> xrange stream-key - + 1) 1) "1636004176924-0" 2) 1) "username" 2) "zhangsan" 2) 1) "1636004189211-0" 2) 1) "username" 2) "wangwu" 127.0.0.1:6379>Copy the code

Note:

Note that when we delete a message from the Stream, the message is not actually deleted, but marked as deleted, while the message is still occupying content space. Memory space is reclaimed only if all messages in all streams are marked for deletion. But the Stream will not be deleted.

XLEN checks the length of elements in Stream

1. Command format

xlen key
Copy the code

2, for example,

Look at the length of the elements in Stream

127.0.0.1:6379> xadd stream-key * username zhangsan
"1636004690578-0"
127.0.0.1:6379> xlen stream-key
(integer) 1
127.0.0.1:6379> xlen not-exists-stream-key
(integer) 0
127.0.0.1:6379>
Copy the code

Note:

Return 0 if the key behind xlen does not exist, otherwise return the number of elements.

7. XTRIM trims elements in Stream

1. Command format

xtrim key MAXLEN|MINID [=|~] threshold [LIMIT count]
Copy the code

2. Prepare data

127.0.0.1:6379> xadd stream-key * username zhangsan "1636009745401-0" 127.0.0.1:6379> multi OK 127.0.0.1:6379(TX)> xadd Stream-key * username lisi QUEUED 127.0.0.1:6379(TX)> xadd stream-key * username wangwu QUEUED 127.0.0.1:6379(TX)> exec 1) "1636009763955-0" 2) "1636009763955-1" 127.0.0.1:6379> xadd stream-key * username zhaoliu "1636009769625-0" 127.0.0.1:6379 >Copy the code

3, for example,

1, Maxlen precise limit

127.0.0.1:6379> xtrim stream-key maxlen 2 # Save the last 2 messages (integer) 2 127.0.0.1:6379> xrange stream-key - + # 1) 1) "wangwu" 2) 1) "1636009769625-0" 2) 1) "username" 2) "wangwu" 2) 1) "1636009769625-0" 2) 1) "username" 2) "Zhaoliu" 127.0.0.1:6379 >Copy the code

Keep the stream-key for the last two messages in this stream.

2. Minid fuzzy limit

Minid is used to delete data smaller than this ID. The data was not tested in the local test.

XREAD consumes messages independently

XREAD simply reads the message and does not delete the message after reading it. Reading messages with XREAD is completely independent of the consumer group, and multiple clients can read messages simultaneously.

1. Command format

xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...]  ID [ID ...]Copy the code

2. Prepare data

127.0.0.1:6379> xadd stream-key * username zhangsan "1636011801365-0" 127.0.0.1:6379> xadd stream-key * username lisi "1636011806261-0" 127.0.0.1:6379> xadd stream-key * username wangwu "16360118109005-0" 127.0.0.1:6379>Copy the code

3, for example,

1. Get the user name of wangwu’s data

127.0.0.1:6379> xread streams stream-key 1636011806261-0 1) 1) "stream-key" 2) 1) 1) "1636011810905-0" 2) 1) "username" 2) "wangwu"Copy the code

2. Obtain two pieces of data

127.0.0.1:6379> xread count 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 2) 1) 2) 1) 2) 1) 2) 1) 2) 1) 2) 1) 2) "Zhangsan" 2) 1) 1) "lisi" 1) "lisi" 2) "lisi" 2) "lisi"Copy the code

Count limits the last message to a single read, because there may not be so many current reads.

3. Read data from Stream to tail without blocking

The next message that reads at the end of the queue is always nil in non-blocking mode

127.0.0.1:6379> xread streams stream-key $(nil)Copy the code

Block reading data from Stream to tail

Note:

  1. $indicates the latest message from the Stream, not the last message from the Stream. Xread Block is returned only after xread block is executed and the message is added using Xadd again.

  2. Block 0 represents a permanent block that is only touched when a message arrives. Block 1000 means to block 1000ms and return nil if 1000ms has not yet been sent

  3. Xread sequential consumption When using Xread for sequential messages, you need to remember the returned message ID and pass in the last returned message ID the next time you call Xread.

  4. Xread reads the message, ignoring the consumer group completely, and the Stream can be understood as a plain list.

9. Related operations of consumer groups

1. Consumer group command

2. Prepare data

Create a Stream with a stream-key

Create two messages aa and BB

127.0.0.1:6379> xadd stream-key * aa aa
"1636362619125-0"
127.0.0.1:6379> xadd stream-key * bb bb
"1636362623191-0"
Copy the code

3. Create consumer groups

1. Create a consumer group that buys from scratch
Xgroup create stream-key(stream name) g1Copy the code
Create a consumer group for the latest message consumption from Stream
xgroup create stream-key g2 $
Copy the code

$represents consumption from the last element, excluding the last element in the Stream, which consumes the latest message.

Create a consumer group to consume after a message
Xgroup create stream-key G3 1636362619125-0 #1636362619125-0 this is the id value of the aa message aboveCopy the code

1636362619125-0 Specifies the ID of a message. All messages in the G3 consumer group are greater than or equal to this ID.

Read the message from the consumer

127.0.0.1:6379> xreadGroup group g1 Automatically create) count 3 (read 3) streams stream - the key (stream) > (from the consumer groups are not assigned to other consumers begin to read) 1) 1) "stream - key" 2) 1) 1) "1636362619125-0" 2) 1) "AA" 2) 1) "1636362623191-0" 2) 1) "bb" 2) "bb" 127.0.0.1:6379> xreadGroup group G2 c1 Count 3 streams stream-key > (nil) # return nil because the g2 consumer group starts reading the latest message and needs to execute 'xadd' in another window. Xreadgroup group g3 c1 count 3 streams stream-key > 127.0.0.1:6379 The BB message has an ID greater than AA, so it reads. 1) 1) "stream-key" 2) 1) 1) 1) "1636362623191-0" 2) 1) "bb" 2) "bb" 127.0.0.1:6379>Copy the code

Read pending messages for consumers

127.0.0.1:6379> xgroup create stream-key g4 0-0 OK 127.0.0.1:6379> xinfo consumers stream-key G1 1) 1) 1) "name" 2) "c1" 3) "Pending" 4) (integer) 2 5) "idle" 6) (integer) 88792 127.0.0.1:6379> Xinfo consumers stream-key g4 (empty array) 127.0.0.1:6379> xreadgroup group g1 c1 count 1 1) 1) 1) 1) 1) 2) 1) 1) 2) 1) "1636362623191-0" 2) 1) "bb" 2) "bb" 127.0.0.1:6379> xreadGroup group g4 c1 count 1 block 0 Streams stream-key 1636362619125-0 1) 1) "stream key" 2) (Empty array) 127.0.0.1:6379>Copy the code

5. Shift consumer information

127.0.0.1:6379> xpending stream-key g1 - + 10 c1
1) 1) "1636362619125-0"
   2) "c1"
   3) (integer) 2686183
   4) (integer) 1
2) 1) "1636362623191-0"
   2) "c1"
   3) (integer) 102274
   4) (integer) 7
127.0.0.1:6379> xpending stream-key g1 - + 10 c2
(empty array)
127.0.0.1:6379> xclaim stream-key g1 c2 102274 1636362623191-0
1) 1) "1636362623191-0"
   2) 1) "bb"
      2) "bb"
127.0.0.1:6379> xpending stream-key g1 - + 10 c2
1) 1) "1636362623191-0"
   2) "c2"
   3) (integer) 17616
   4) (integer) 8
127.0.0.1:6379>
Copy the code

This can also be done via XautoClaim.

6. Some monitoring commands

Check pending messages for consumers in the consumer group
127.0.0.1:6379> xpending stream-key G1 - + 10 C2 1) 1) "1636362623191-0" 2) "C2" 3) (INTEGER) 1247680 4) (integer) 8 127.0.0.1:6379 >Copy the code
2. View the consumer information in the consumer group
127.0.0.1:6379> xinfo consumers stream-key G1 1) 1) 1) "name" 2) "c1" 3) "pending" 4) (integer) 1 5) "idle" 6) (integer) 1474864 2) 1) "name" 2) "C2" 3) "pending" 4) (INTEGER) 1 5) "idle" 6) (integer) 1290069 127.0.0.1:6379>Copy the code
3. View the consumer group information
127.0.0.1:6379> xinfo groups stream-key
1) 1) "name"
   2) "g1"
   3) "consumers"
   4) (integer) 2
   5) "pending"
   6) (integer) 2
   7) "last-delivered-id"
   8) "1636362623191-0"
2) 1) "name"
   2) "g2"
   3) "consumers"
......
Copy the code
4. View Stream information
127.0.0.1:6379> xinfo stream stream-key 1) "length" 2) (integer) 2 3) "radix tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "last-generated-id" 8) "1636362623191-0" 9) "groups" 10) (integer) 4 11) "first-entry" 12) 1) "1636362619125-0" 2) 1) "aa" 2) "aa" 13) "last-entry" 14) 1) "1636362623191-0" 2) 1) "bb" 2) "bb" 127.0.0.1:6379 >Copy the code

5. Reference documents

1, redis. IO/switchable viewer/stre…

2, www.runoob.com/redis/redis…