preface
The previous Stream article looked at the xread source code with xreadGroup and saw the consumer group characteristics through its implementation. This article looks at the rest of the Stream commands to see what else a Stream can do.
The body of the
XCLAIM command
Since the consumer will enter the PEL after consuming the message, the message needs to be ack later. If the consumer is offline, the message accumulation cannot be ACK. At this time, XCLAIM can be called to put multiple messages into its PEL
XCLAIM <key> <group> <consumer> <min-idle-time> <ID- 1> <ID2 ->
[IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
[FORCE] [JUSTID]
Copy the code
XCLAIM passes the stream key, the group key, and the corresponding comsumer key, and supports multiple ids.
- : Minimum idle time for a message
- IDLE: Indicates the delivered_time of the new message. IDLE calculates the delivered_time according to the current time. The current time is -idle
- TIME: The difference with IDLE is that TIME directly replaces delivered_time
- RETRYCOUNT: Modifies the RETRYCOUNT property of the message
- FORCE: If no message is found in comsumer’s PEL, fetch a message from stream and add it to the current PEL
- JUSTID: only the ID is returned
- LASTID: changes the last_id attribute of the group
void xclaimCommand(client *c) {
streamCG *group = NULL;
// Get the stream by key
robj *o = lookupKeyRead(c->db,c->argv[1]);
long long minidle;
long long retrycount = - 1;
mstime_t deliverytime = - 1;
int force = 0;
int justid = 0;
// Parse stream and gruop
if (o) {
if (checkType(c,o,OBJ_STREAM)) return;
group = streamLookupCG(o->ptr,c->argv[2]->ptr);
}
if (o == NULL || group == NULL) {
addReplyErrorFormat(c,"-NOGROUP No such key '%s' or "
"consumer group '%s'", (char*)c->argv[1]->ptr,
(char*)c->argv[2]->ptr);
return;
}
if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,
"Invalid min-idle-time argument for XCLAIM")
!= C_OK) return;
if (minidle < 0) minidle = 0;
Copy the code
First, find the corresponding stream and group according to the parameters, parse and initialize Minidle
int j;
for (j = 5; j < c->argc; j++) {
streamID id;
if (streamParseStrictIDOrReply(NULL,c->argv[j],&id,0) != C_OK) break;
}
int last_id_arg = j- 1;
Copy the code
Then verify that the ID is a message, but this step does not record the ID information, but the last index of the parameter ID
// If last_id is greater than the current value, change and propagate
if (streamCompareID(&last_id,&group->last_id) > 0) {
group->last_id = last_id;
propagate_last_id = 1;
}
// Check deliveryTime is valid and whether the default value is set to the current time
if(deliverytime ! =- 1) {
if (deliverytime < 0 || deliverytime > now) deliverytime = now;
} else {
deliverytime = now;
}
Copy the code
These are the items that need to be checked after parsing the parameters. The code for parsing the parameters will not be displayed. Eventually all the above parameters will be parsed (if any).
streamConsumer *consumer = NULL;
void *arraylenptr = addReplyDeferredLen(c);
size_t arraylen = 0;
for (int j = 5; j <= last_id_arg; j++) {
streamID id;
unsigned char buf[sizeof(streamID)];
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)
serverPanic("StreamID invalid after check. Should not be possible.");
streamEncodeID(buf,&id);
// Get uncommitted messages
streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
// If force is set but not found in pel, fetch from stream and add to group pel
if (force && nack == raxNotFound) {
streamIterator myiterator;
streamIteratorStart(&myiterator,o->ptr,&id,&id,0);
int64_t numfields;
int found = 0;
streamID item_id;
if (streamIteratorGetID(&myiterator,&item_id,&numfields)) found = 1;
streamIteratorStop(&myiterator);
if(! found)continue;
/ / create
nack = streamCreateNACK(NULL);
raxInsert(group->pel,buf,sizeof(buf),nack,NULL);
}
if(nack ! = raxNotFound) {// Check the minimum survival time
if (nack->consumer && minidle) {
mstime_t this_idle = now - nack->delivery_time;
if (this_idle < minidle) continue;
}
// Delete the old consumer nack and place it in the current consumer
if (nack->consumer)
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
/* Update the consumer and idle time. */
if (consumer == NULL)
consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE);
nack->consumer = consumer;
nack->delivery_time = deliverytime;
if (retrycount >= 0) {
nack->delivery_count = retrycount;
// If there is no justid command, the message is processed again
} else if(! justid) { nack->delivery_count++; }// Add new consumer PEL
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
//justid: only the ID is returned
if (justid) {
addReplyStreamID(c,&id);
} else {
size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1.0.NULL.NULL,STREAM_RWR_RAWENTRIES,NULL);
if(! emitted) addReplyNull(c); } arraylen++;/* Propagate this change. */
streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
propagate_last_id = 0; server.dirty++; }}Copy the code
As you can see from the comment, this command is atomic, and the id will be parsed to verify. If the id is incorrectly obtained at this stage, redis will be interrupted
if (propagate_last_id) {
streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);
server.dirty++;
}
setDeferredArrayLen(c,arraylenptr,arraylen);
Use this method to make the call method not propagate subsequent commands because it has already been propagated
preventCommandPropagation(c);
Copy the code
Finally, the command is propagated
XCLAIM allows you to modify the attributes of a message while moving it, providing greater flexibility
XACK command
Xack is a consumer’s submission of a consumed message, and the logic is simple
void xackCommand(client *c) {
streamCG *group = NULL;
// Get stream and group
robj *o = lookupKeyRead(c->db,c->argv[1]);
if (o) {
if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
group = streamLookupCG(o->ptr,c->argv[2]->ptr);
}
if (o == NULL || group == NULL) {
addReply(c,shared.czero);
return;
}
// Check if these ids are valid and one of them is invalid
for (int j = 3; j < c->argc; j++) {
streamID id;
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
}
int acknowledged = 0;
for (int j = 3; j < c->argc; j++) {
streamID id;
unsigned char buf[sizeof(streamID)];
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)
serverPanic("StreamID invalid after check. Should not be possible.");
streamEncodeID(buf,&id);
// Delete nack from group pel and consumer PEL
streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
if(nack ! = raxNotFound) { raxRemove(group->pel,buf,sizeof(buf),NULL);
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
streamFreeNACK(nack);
acknowledged++;
server.dirty++;
}
}
addReplyLongLong(c,acknowledged);
}
Copy the code
Only delete group and COMsumer pel corresponding data
xtrim
StreamTrimByLength xtrim supports trimming streams, specifying maximum size, and using “~” to indicate imprecise deletion
int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
if (s->length <= maxlen) return 0;
raxIterator ri;
raxStart(&ri,s->rax);
raxSeek(&ri,"^".NULL.0);
int64_t deleted = 0;
Copy the code
Start iterating from the first node of the RAX tree
while(s->length > maxlen && raxNext(&ri)) {
unsigned char *lp = ri.data, *p = lpFirst(lp);
int64_t entries = lpGetInteger(p);
// Delete whether the entire node is still greater than maxLen
if (s->length - entries >= maxlen) {
lpFree(lp);
raxRemove(s->rax,ri.key,ri.key_len,NULL);
raxSeek(&ri,"> =",ri.key,ri.key_len);
s->length -= entries;
deleted += entries;
continue;
}
// The entire node is interrupted if it cannot be deleted
if (approx) break;
Copy the code
The while condition is that the length of stream must be less than the specified length. It can be seen from this that “~” is used, indicating that only the whole node is deleted each time. After deleting the whole node, strema must be greater than or equal to maxLen, indicating that the length of strema after deletion is greater than or equal to maxLen
// Calculate how much to delete
int64_t to_delete = s->length - maxlen;
serverAssert(to_delete < entries);
// Modify information about undeleted nodes and deleted nodes
lp = lpReplaceInteger(lp,&p,entries-to_delete);
p = lpNext(lp,p);
int64_t marked_deleted = lpGetInteger(p);
lp = lpReplaceInteger(lp,&p,marked_deleted+to_delete);
p = lpNext(lp,p);
/ / skip masterEntry
int64_t master_fields_count = lpGetInteger(p);
p = lpNext(lp,p);
for (int64_t j = 0; j < master_fields_count; j++)
p = lpNext(lp,p);
p = lpNext(lp,p);
Copy the code
Calculate the number of entries that can be deleted from the current node, skipping masterEntry on the Listpack
while(p) {
int flags = lpGetInteger(p);
int to_skip;
// Mark undeleted nodes as deleted
if(! (flags & STREAM_ITEM_FLAG_DELETED)) { flags |= STREAM_ITEM_FLAG_DELETED; lp = lpReplaceInteger(lp,&p,flags); deleted++; s->length--;if (s->length <= maxlen) break; /* Enough entries deleted. */
}
// Skip ms seq and num-field nodes
p = lpNext(lp,p);
p = lpNext(lp,p);
p = lpNext(lp,p);
if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
to_skip = master_fields_count;
} else {
to_skip = lpGetInteger(p);
to_skip = 1+(to_skip*2);
}
while(to_skip--) p = lpNext(lp,p);
p = lpNext(lp,p);
}
// Too much garbage needs to be cleaned up but this is not done here
entries -= to_delete;
marked_deleted += to_delete;
if (entries + marked_deleted > 10 && marked_deleted > entries/2) {
/ *TODO: perform a garbage collection. */
}
raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
break;
Copy the code
It then loops through the Listpack for deletion, where the deletion is a flag deletion, marking the node’s flag as deleted. When the tag removes too many nodes, it needs to be recycled, but ++ is not implemented here
Finally, re-insert the Listpack into the stream
conclusion
This article analyzes XCLAIM, XACK, XTRIM command source code, it can be seen that in the implementation of raX and Listpack data structure, the implementation of these functions are very simple, these simple implementation together constitute a message queue similar to Kafka