preface
Stream is a message queue feature introduced in Redis5. According to redis authors, Stream is heavily borrowed from Kafka’s implementation, so kafka-like features can be used in Redis as well. Stream design exactly how, into the source code to find out.
The body of the
Introduction to the
xadd mystream * testKey1 testVal1 testKey2 testVal2
Copy the code
This is the stream appending command, which consists of the following:
- Mystream: is the name of stream
- * : specifies the id
- Testkey-val: field and value corresponding to the message
You can see that a message can have multiple field key-value pairs, and you can specify no consumer group
The internal message id of a stream is strictly incremented, and you can create consumer groups. A stream can have multiple consumer groups.
- Consumer groups are uniquely identified by group names, and each consumer can consume the entire message of a stream.
- The consumer in the consumer group iscompetitionRelationship, the same message can only be consumed by one consumer.
- The consumer needs to confirm the message after consuming it.
The underlying structure
The underlying structure of stream consists mainly of RAX and Listpack
rax
Rax Radix Tree (part 1)
Rax Cardinality Tree
Rax is detailed in the previous article
listpack
Listpack is a value node in a RAx tree that stores strings or integers as follows:
- TotalBytes is the size of the Listpack table space, occupying 4 bytes
- Num is the number of stored entries. Similar to the compressed list, the maximum value can be 65535. If the number of entries exceeds 65535, statistics need to be traversed
- Entry is the actual data node:
- Encode method
- Content Actual data
- Backlen indicates the length, which is used for traversal from back to front
- End Indicates the end flag, 0xFF
unsigned char *lpNew(void) {
unsigned char *lp = lp_malloc(LP_HDR_SIZE+1);
if (lp == NULL) return NULL;
lpSetTotalBytes(lp,LP_HDR_SIZE+1);
lpSetNumElements(lp,0);
lp[LP_HDR_SIZE] = LP_EOF;
return lp;
}
#define LP_HDR_SIZE 6
Copy the code
Create an empty listpack using the lpNew method, with 32 bits of totalBytes plus 16 bits of num. Then set the trailing flag
//ele: insert new element size: insert length P: insert position WHERE: three modes (front insert, back insert, replace) newp (used to return the next element of the insert element)
unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp) {
unsigned char intenc[LP_MAX_INT_ENCODING_LEN];
unsigned char backlen[LP_MAX_BACKLEN_SIZE];
uint64_t enclen; / / encode length
// If the element to be inserted is null, replace (delete)
if (ele == NULL) where = LP_REPLACE;
// If it is post-insert, skip to pre-insert of next node instead
// The subsequent inserts will be the same
if (where == LP_AFTER) {
p = lpSkip(p);
where = LP_BEFORE;
}
// Store the offset of p before insertion
unsigned long poff = p-lp;
// Get the encType of ele
int enctype;
if (ele) {
enctype = lpEncodeGetType(ele,size,intenc,&enclen);
} else {
enctype = - 1;
enclen = 0;
}
// Get backLen to traverse backwards
unsigned long backlen_size = ele ? lpEncodeBacklen(backlen,enclen) : 0;
uint64_t old_listpack_bytes = lpGetTotalBytes(lp);
uint32_t replaced_len = 0;
if (where == LP_REPLACE) {
replaced_len = lpCurrentEncodedSize(p);
replaced_len += lpEncodeBacklen(NULL,replaced_len);
}
uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size
- replaced_len;
if (new_listpack_bytes > UINT32_MAX) return NULL;
unsigned char *dst = lp + poff; /* May be updated after reallocation. */
/* Need more space */
if (new_listpack_bytes > old_listpack_bytes) {
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
dst = lp + poff;
}
// Move the element
if (where == LP_BEFORE) {
memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);
} else { /* LP_REPLACE. */
long lendiff = (enclen+backlen_size)-replaced_len;
memmove(dst+replaced_len+lendiff,
dst+replaced_len,
old_listpack_bytes-poff-replaced_len);
}
/* Free up space */
if (new_listpack_bytes < old_listpack_bytes) {
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
dst = lp + poff;
}
/* Save the next */
if (newp) {
*newp = dst;
/* In case of deletion, set 'newp' to NULL if the next element is * the EOF element. */
if(! ele && dst[0] == LP_EOF) *newp = NULL;
}
if (ele) {
if (enctype == LP_ENCODING_INT) {
memcpy(dst,intenc,enclen);
} else {
lpEncodeString(dst,ele,size);
}
dst += enclen;
memcpy(dst,backlen,backlen_size);
dst += backlen_size;
}
/* Change the header information */
if(where ! = LP_REPLACE || ele ==NULL) {
uint32_t num_elements = lpGetNumElements(lp);
if(num_elements ! = LP_HDR_NUMELE_UNKNOWN) {// Add or remove elements
if (ele)
lpSetNumElements(lp,num_elements+1);
else
lpSetNumElements(lp,num_elements- 1);
}
}
lpSetTotalBytes(lp,new_listpack_bytes);
#if 0
unsigned char *oldlp = lp;
lp = lp_malloc(new_listpack_bytes);
memcpy(lp,oldlp,new_listpack_bytes);
if (newp) {
unsigned long offset = (*newp)-oldlp;
*newp = lp + offset;
}
/* Make sure the old allocation contains garbage. */
memset(oldlp,'A',new_listpack_bytes);
lp_free(oldlp);
#endif
return lp;
}
Copy the code
Listpack insert code, see the comments, the general flow is as follows:
- Change the forward insert to post insert, unify the code
- Calculates the space needed to insert or replace elements and reallocates the entire ListPack memory
- Reposition old elements to make room for new ones
- Release the old Listpack and insert or replace it in the new Listpack
- Modifying Header Information
unsigned char *lpFirst(unsigned char *lp) {
lp += LP_HDR_SIZE; /* Skip the header. */
if (lp[0] == LP_EOF) return NULL;
return lp;
}
Copy the code
Gets a pointer to the first element, which is just the next element to the header
unsigned char *lpPrev(unsigned char *lp, unsigned char *p) {
if (p-lp == LP_HDR_SIZE) return NULL;
p--;
uint64_t prevlen = lpDecodeBacklen(p);
Prevlen (prevlen, prevlen, prevlen, prevlen)
prevlen += lpEncodeBacklen(NULL,prevlen);
return p-prevlen+1;
}
Copy the code
Get when an element pointer, just get the Backlen property to iterate backwards
Listpack supports either next or PERV traversal, but the rest of the methods won’t be covered here
The stream parsing
typedef struct stream {
rax *rax;
uint64_t length;
streamID last_id;
rax *cgroups; /* Consumer group */
} stream;
Copy the code
The stream is stored by a RAx cardinality tree, last_id for the current latest ID, and another RAx tree for the current stream’s consumer group.
typedef struct streamID {
uint64_t ms;
uint64_t seq;
} streamID;
Copy the code
StreamID consists of milliseconds and self-increasing sequence numbers
void streamIncrID(streamID *id) {
if (id->seq == UINT64_MAX) {
if (id->ms == UINT64_MAX) {
id->ms = id->seq = 0;
} else {
id->ms++;
id->seq = 0; }}else{ id->seq++; }}Copy the code
The maximum value of SEQ and MS are expressed in sectors. If the value exceeds the maximum value, set it to 0
The message format
Xadd = listPack = listPack = listPack = listPack = listPack = listPack = listPack
- Listpack has a masterEntry node, which is used to store information about the fields used to create listPack (for example, the first time XAdd stores two fields in masterEntry). This is because the messages in a stream are mostly similar. In this way, subsequent insert messages do not need to store field information
- A Listpack can store multiple messages
- A message can occupy multiple Listpack entries
The following masterEntry is introduced
MasterEntry is a node in the Listpack. It is equivalent to a metadata node. The meanings of the nodes are as follows:
- Count: Number of undeleted messages in the Listpack
- Deleted: Indicates the number of deleted messages in the Listpack
- Num -field: indicates the number of fields
- Field1-fieldn: Field inserted when the Listpack was first created
- 0: indicates the end flag
As in the previous command, the testKey1 and testKey2 fields and value are inserted, which are the fields inside masterEntry
++ If an information message is inserted later, but the message field is the same as before, the Listpack entry looks like this: ++
Field information is not stored in entry, which achieves the purpose of saving space
Lp-count indicates the number of ListPacks that this object occupies 3+N
If the filed of the message is different, the storage mode will change
At this point, the new field information will be next to the value
consumers
typedef struct streamCG {
streamID last_id;
rax *pel;
rax *consumers;
} streamCG;
Copy the code
The consumer group uses two RAx trees to hold unconfirmed messages and consumers, with last_id indicating the most recent id confirmed
typedef struct streamConsumer {
mstime_t seen_time;
sds name;
rax *pel;
} streamConsumer;
Copy the code
Seen_time is the last active time, and internally raX is also used to represent the current consumer unconfirmed message
typedef struct streamNACK {
mstime_t delivery_time; /* Last time to send */
uint64_t delivery_count; /* The number of times the message was sent */
streamConsumer *consumer; /* Belongs to the consumer */
} streamNACK;
Copy the code
Unacknowledged message structure, the PEL within the consumer group is shared with the PEL within the consumer group. When a consumer consumes a message through the consumer group, a streamNACK is created and placed in both Pels
The source code parsing
Follow XADD as shown at the beginning to parse the source code
XADD key [MAXLEN [~|=] ] <ID or *> [field value] [field value] …
XADD supports MAXLEN to specify the length of the stream, and can use ~ to optimize the length
void xaddCommand(client *c) {
streamID id;
// If id id_given=1 is manually set
int id_given = 0;
long long maxlen = - 1; /* 是否设置maxLen */
int approx_maxlen = 0; /* MAXLEN parameter ~ */
int maxlen_arg_idx = 0; /* idx*/ for maxlen
int i = 2;
for (; i < c->argc; i++) {
// How many key-val arguments follow
int moreargs = (c->argc- 1) - i;
char *opt = c->argv[i]->ptr;
/ / * on the id
if (opt[0] = =The '*' && opt[1] = ='\ 0') {
/* This is just a fast path for the common case of auto-ID * creation. */
break;
} else if(! strcasecmp(opt,"maxlen") && moreargs) {
approx_maxlen = 0;
char *next = c->argv[i+1]->ptr;
// parse manlen ~
if (moreargs >= 2 && next[0] = ='~' && next[1] = ='\ 0') {
approx_maxlen = 1;
i++;
} else if (moreargs >= 2 && next[0] = ='=' && next[1] = ='\ 0') {
i++;
}
if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
!= C_OK) return;
if (maxlen < 0) {
addReplyError(c,"The MAXLEN argument must be >= 0.");
return;
}
i++;
maxlen_arg_idx = i;
} else {
// Manually set the ID
if (streamParseStrictIDOrReply(c,c->argv[i],&id,0) != C_OK) return;
id_given = 1;
break; }}/ / key - val index
int field_pos = i+1;
// Key val must be paired
if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) = =1) {
addReplyError(c,"wrong number of arguments for XADD");
return;
}
// The minimum ID cannot be set
if (id_given && id.ms == 0 && id.seq == 0) {
addReplyError(c,"The ID specified in XADD must be greater than 0-0");
return;
}
Copy the code
This section of code is the parsing and validation of the parameters, see the comment
robj *o;
stream *s;
// Find or create a streamname-stream object in db
if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1)) = =NULL) return;
s = o->ptr;
// The capacity is exceeded
if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) {
addReplyError(c,"The stream has exhausted the last possible ID, "
"unable to add more items");
return;
}
if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
&id, id_given ? &id : NULL)
== C_ERR)
{
addReplyError(c,"The ID specified in XADD is equal or smaller than the "
"target stream top item");
return;
}
// Send the id information to the client
addReplyStreamID(c,&id);
Copy the code
First call streamTypeLookupWriteOrCreate find or create a stream, to verify whether the stream last_id beyond the maximum, then call streamAppendItem added
robj *streamTypeLookupWriteOrCreate(client *c, robj *key) {
robj *o = lookupKeyWrite(c->db,key);
if (o == NULL) {
o = createStreamObject();
dbAdd(c->db,key,o);
} else {
if(o->type ! = OBJ_STREAM) { addReply(c,shared.wrongtypeerr);return NULL; }}return o;
}
Copy the code
The stream is stored in db like a regular key-val. Create an empty stream by calling createStreamObject. The streamAppendItem insert process is inserted like the ListPack analyzed above
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) {
// Add or set the id
streamID id;
if (use_id)
id = *use_id;
else
streamNextID(&s->last_id,&id);
// The increment id is checked in a loop and returns an error
if (streamCompareID(&id,&s->last_id) <= 0) return C_ERR;
raxIterator ri;
raxStart(&ri,s->rax);
raxSeek(&ri,"$".NULL.0);
size_t lp_bytes = 0;
unsigned char *lp = NULL;
if (raxNext(&ri)) {
lp = ri.data;
lp_bytes = lpBytes(lp);
}
raxStop(&ri);
uint64_t rax_key[2];
streamID master_id;
// Check the number and length of listPack nodes
if(lp ! =NULL) {
if (server.stream_node_max_bytes &&
lp_bytes >= server.stream_node_max_bytes)
{
lp = NULL;
} else if (server.stream_node_max_entries) {
int64_t count = lpGetInteger(lpFirst(lp));
if (count >= server.stream_node_max_entries) lp = NULL; }}Copy the code
First determine if you need to increment the ID, and then get the Listpack, because the ID is always incrementing, so the raxSeek method passes “$” to indicate getting the last node
f (lp == NULL || lp_bytes >= server.stream_node_max_bytes) {
master_id = id;
// encode the message id into rax_key
streamEncodeID(rax_key,&id);
lp = lpNew();
//count is the number of deleted messages 1
lp = lpAppendInteger(lp,1);
//deleted Number of deleted messages 0
lp = lpAppendInteger(lp,0);
/ / the number field
lp = lpAppendInteger(lp,numfields);
for (int64_t i = 0; i < numfields; i++) {
sds field = argv[i*2]->ptr;
lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
}
// The trailing flag bit is 0
lp = lpAppendInteger(lp,0);
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
Copy the code
This is the new listpack insert, you can see three consecutive lpAppendInteger, corresponding to the masterEntry format mentioned above, and then insert fields, set to STREAM_ITEM_FLAG_SAMEFIELDS, The fields and masterEntry are the same, and the above optimization can be used
else {
serverAssert(ri.key_len == sizeof(rax_key));
memcpy(rax_key,ri.key,sizeof(rax_key));
// Parse rax_key to master_id
streamDecodeID(rax_key,&master_id);
unsigned char *lp_ele = lpFirst(lp);
// First get count and then add count
int64_t count = lpGetInteger(lp_ele);
lp = lpReplaceInteger(lp,&lp_ele,count+1);
// Skip the deleted segment
lp_ele = lpNext(lp,lp_ele);
lp_ele = lpNext(lp,lp_ele);
// Check to insert the field just inserted
int64_t master_fields_count = lpGetInteger(lp_ele);
lp_ele = lpNext(lp,lp_ele);
if (numfields == master_fields_count) {
int64_t i;
// Compare the filed with the tape
for (i = 0; i < master_fields_count; i++) {
sds field = argv[i*2]->ptr;
int64_t e_len;
unsigned char buf[LP_INTBUF_SIZE];
unsigned char *e = lpGet(lp_ele,&e_len,buf);
// Copy data
if(sdslen(field) ! = (size_t)e_len ||
memcmp(e,field,e_len) ! =0) break;
lp_ele = lpNext(lp,lp_ele);
}
// All fields are the same
if(i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS; }}Copy the code
If listpack already exists, the difference between the field information on masterEntry and the previous insertion is checked, and if it is exactly the same, STREAM_ITEM_FLAG_SAMEFIELDS is set
// Three headers
lp = lpAppendInteger(lp,flags);
lp = lpAppendInteger(lp,id.ms - master_id.ms);
lp = lpAppendInteger(lp,id.seq - master_id.seq);
// If not, modify numfields
if(! (flags & STREAM_ITEM_FLAG_SAMEFIELDS)) lp = lpAppendInteger(lp,numfields);for (int64_t i = 0; i < numfields; i++) {
sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr;
// Insert field information if different
if(! (flags & STREAM_ITEM_FLAG_SAMEFIELDS)) lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
}
int64_t lp_count = numfields;
lp_count += 3; Flags + ms + seq. */
// If not, add numfields
if(! (flags & STREAM_ITEM_FLAG_SAMEFIELDS)) { lp_count += numfields+1;
}
lp = lpAppendInteger(lp,lp_count);
// Insert a new rax tree
if(ri.data ! = lp) raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
s->length++;
s->last_id = id;
if (added_id) *added_id = id;
return C_OK;
Copy the code
Then there is the insertion of concrete values, with the following characteristics:
- The ID saved by listPack Entry is the current ID minus the value of MasterEntry-ID
- If the fields are different, insert the field before the value
- Finally, add the new lP_count field and reinsert it into raX
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
server.dirty++;
if (maxlen >= 0) {
/* Notify xtrim event if needed. */
if (streamTrimByLength(s,maxlen,approx_maxlen)) {
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
}
if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
}
// override id aof, etc
robj *idarg = createObjectFromStreamID(&id);
rewriteClientCommandArgument(c,i,idarg);
decrRefCount(idarg);
/* We need to signal to blocked clients that there is new data on this * stream. */
if (server.blocked_clients_by_type[BLOCKED_STREAM])
signalKeyAsReady(c->db, c->argv[1]);
Copy the code
This triggers notifications, calls to the maxlen parameter, and overrides of the increment ID. Since the increment ID generation depends on the current time, the generated ID needs to be saved to modify the command for consistency between master and slave and AOF
Note the signalKeyAsReady method, which will be covered later. ++
int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
if (s->length <= maxlen) return 0;
raxIterator ri;
raxStart(&ri,s->rax);
raxSeek(&ri,"^".NULL.0);
int64_t deleted = 0;
while(s->length > maxlen && raxNext(&ri)) {
unsigned char *lp = ri.data, *p = lpFirst(lp);
int64_t entries = lpGetInteger(p);
// Delete whether the entire node is still greater than maxLen
if (s->length - entries >= maxlen) {
lpFree(lp);
raxRemove(s->rax,ri.key,ri.key_len,NULL);
raxSeek(&ri,"> =",ri.key,ri.key_len);
s->length -= entries;
deleted += entries;
continue;
}
// If the entire node cannot be deleted, it is interrupted
if (approx) break;
// Calculate how much to delete
int64_t to_delete = s->length - maxlen;
serverAssert(to_delete < entries);
// Modify information about undeleted nodes and deleted nodes
lp = lpReplaceInteger(lp,&p,entries-to_delete);
p = lpNext(lp,p);
int64_t marked_deleted = lpGetInteger(p);
lp = lpReplaceInteger(lp,&p,marked_deleted+to_delete);
p = lpNext(lp,p);
/ / skip masterEntry
int64_t master_fields_count = lpGetInteger(p);
p = lpNext(lp,p);
for (int64_t j = 0; j < master_fields_count; j++)
p = lpNext(lp,p);
p = lpNext(lp,p);
while(p) {
int flags = lpGetInteger(p);
int to_skip;
// Mark undeleted nodes as deleted
if(! (flags & STREAM_ITEM_FLAG_DELETED)) { flags |= STREAM_ITEM_FLAG_DELETED; lp = lpReplaceInteger(lp,&p,flags); deleted++; s->length--;if (s->length <= maxlen) break; /* Enough entries deleted. */
}
// Skip ms seq and num-field nodes
p = lpNext(lp,p);
p = lpNext(lp,p);
p = lpNext(lp,p);
if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
to_skip = master_fields_count;
} else {
to_skip = lpGetInteger(p);
to_skip = 1+(to_skip*2);
}
while(to_skip--) p = lpNext(lp,p);
p = lpNext(lp,p);
}
// Too much garbage needs to be cleaned up but this is not done here
entries -= to_delete;
marked_deleted += to_delete;
if (entries + marked_deleted > 10 && marked_deleted > entries/2) {
/ *TODO: perform a garbage collection. */
}
raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
break;
}
raxStop(&ri);
return deleted;
}
Copy the code
This is the code to subtract the length of the stream. The main flow is as follows:
- Starting with the head node of rax, approx means that only the entire listPack is removed, and if the entire node cannot be removed, the deletion is stopped
- If an entry is deleted, only the flag is marked as deleted and the count is changed
- If the tag removes too many nodes, you need to clean up the ListPack, but the code doesn’t do that here
conclusion
Above is the stream structure and internal listpack structure source, according to the picture you can see the information recorded by listpack, in Xadd source code can also be strictly see that every step of the operation is in accordance with the Listpack structure.