preface

The previous Stream article looked at the xread source code with xreadGroup and saw the consumer group characteristics through its implementation. This article looks at the rest of the Stream commands to see what else a Stream can do.

The body of the

XCLAIM command

Since the consumer will enter the PEL after consuming the message, the message needs to be ack later. If the consumer is offline, the message accumulation cannot be ACK. At this time, XCLAIM can be called to put multiple messages into its PEL

XCLAIM <key> <group> <consumer> <min-idle-time> <ID- 1> <ID2 ->
        [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
        [FORCE] [JUSTID]
Copy the code

XCLAIM passes the stream key, the group key, and the corresponding comsumer key, and supports multiple ids.

  • : Minimum idle time for a message
  • IDLE: Indicates the delivered_time of the new message. IDLE calculates the delivered_time according to the current time. The current time is -idle
  • TIME: The difference with IDLE is that TIME directly replaces delivered_time
  • RETRYCOUNT: Modifies the RETRYCOUNT property of the message
  • FORCE: If no message is found in comsumer’s PEL, fetch a message from stream and add it to the current PEL
  • JUSTID: only the ID is returned
  • LASTID: changes the last_id attribute of the group
void xclaimCommand(client *c) {
    streamCG *group = NULL;
    // Get the stream by key
    robj *o = lookupKeyRead(c->db,c->argv[1]);
    long long minidle;
    long long retrycount = - 1;
    mstime_t deliverytime = - 1;
    int force = 0;
    int justid = 0;

    // Parse stream and gruop
    if (o) {
        if (checkType(c,o,OBJ_STREAM)) return;
        group = streamLookupCG(o->ptr,c->argv[2]->ptr);
    }


    if (o == NULL || group == NULL) {
        addReplyErrorFormat(c,"-NOGROUP No such key '%s' or "
                              "consumer group '%s'", (char*)c->argv[1]->ptr,
                              (char*)c->argv[2]->ptr);
        return;
    }

    if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,
        "Invalid min-idle-time argument for XCLAIM")
        != C_OK) return;
    if (minidle < 0) minidle = 0;
Copy the code

First, find the corresponding stream and group according to the parameters, parse and initialize Minidle

    int j;
    for (j = 5; j < c->argc; j++) {
        streamID id;
        if (streamParseStrictIDOrReply(NULL,c->argv[j],&id,0) != C_OK) break;
    }
int last_id_arg = j- 1;
Copy the code

Then verify that the ID is a message, but this step does not record the ID information, but the last index of the parameter ID

// If last_id is greater than the current value, change and propagate
    if (streamCompareID(&last_id,&group->last_id) > 0) {
        group->last_id = last_id;
        propagate_last_id = 1;
    }

    // Check deliveryTime is valid and whether the default value is set to the current time
    if(deliverytime ! =- 1) {

        if (deliverytime < 0 || deliverytime > now) deliverytime = now;
    } else {
        deliverytime = now;
    }
Copy the code

These are the items that need to be checked after parsing the parameters. The code for parsing the parameters will not be displayed. Eventually all the above parameters will be parsed (if any).

streamConsumer *consumer = NULL;
    void *arraylenptr = addReplyDeferredLen(c);
    size_t arraylen = 0;
    for (int j = 5; j <= last_id_arg; j++) {
        streamID id;
        unsigned char buf[sizeof(streamID)];
        if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)
            serverPanic("StreamID invalid after check. Should not be possible.");
        streamEncodeID(buf,&id);

        // Get uncommitted messages
        streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));

        // If force is set but not found in pel, fetch from stream and add to group pel
        if (force && nack == raxNotFound) {
            streamIterator myiterator;
            streamIteratorStart(&myiterator,o->ptr,&id,&id,0);
            int64_t numfields;
            int found = 0;
            streamID item_id;
            if (streamIteratorGetID(&myiterator,&item_id,&numfields)) found = 1;
            streamIteratorStop(&myiterator);


            if(! found)continue;
            / / create
            nack = streamCreateNACK(NULL);
            raxInsert(group->pel,buf,sizeof(buf),nack,NULL);
        }

        if(nack ! = raxNotFound) {// Check the minimum survival time
            if (nack->consumer && minidle) {
                mstime_t this_idle = now - nack->delivery_time;
                if (this_idle < minidle) continue;
            }
            // Delete the old consumer nack and place it in the current consumer
            if (nack->consumer)
                raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
            /* Update the consumer and idle time. */
            if (consumer == NULL)
                consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE);
            nack->consumer = consumer;
            nack->delivery_time = deliverytime;
            if (retrycount >= 0) {
                nack->delivery_count = retrycount;
                // If there is no justid command, the message is processed again
            } else if(! justid) { nack->delivery_count++; }// Add new consumer PEL
            raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
            //justid: only the ID is returned
            if (justid) {
                addReplyStreamID(c,&id);
            } else {
                size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1.0.NULL.NULL,STREAM_RWR_RAWENTRIES,NULL);
                if(! emitted) addReplyNull(c); } arraylen++;/* Propagate this change. */
            streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
            propagate_last_id = 0; server.dirty++; }}Copy the code

As you can see from the comment, this command is atomic, and the id will be parsed to verify. If the id is incorrectly obtained at this stage, redis will be interrupted

 if (propagate_last_id) {
        streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);
        server.dirty++;
    }
    setDeferredArrayLen(c,arraylenptr,arraylen);
    Use this method to make the call method not propagate subsequent commands because it has already been propagated
    preventCommandPropagation(c);
Copy the code

Finally, the command is propagated

XCLAIM allows you to modify the attributes of a message while moving it, providing greater flexibility

XACK command

Xack is a consumer’s submission of a consumed message, and the logic is simple

void xackCommand(client *c) {
    streamCG *group = NULL;
    // Get stream and group
    robj *o = lookupKeyRead(c->db,c->argv[1]);
    if (o) {
        if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
        group = streamLookupCG(o->ptr,c->argv[2]->ptr);
    }

    if (o == NULL || group == NULL) {
        addReply(c,shared.czero);
        return;
    }

    // Check if these ids are valid and one of them is invalid
    for (int j = 3; j < c->argc; j++) {
        streamID id;
        if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
    }

    int acknowledged = 0;
    for (int j = 3; j < c->argc; j++) {
        streamID id;
        unsigned char buf[sizeof(streamID)];
        if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)
            serverPanic("StreamID invalid after check. Should not be possible.");
        streamEncodeID(buf,&id);

        // Delete nack from group pel and consumer PEL
        streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
        if(nack ! = raxNotFound) { raxRemove(group->pel,buf,sizeof(buf),NULL);
            raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
            streamFreeNACK(nack);
            acknowledged++;
            server.dirty++;
        }
    }
    addReplyLongLong(c,acknowledged);
}
Copy the code

Only delete group and COMsumer pel corresponding data

xtrim

StreamTrimByLength xtrim supports trimming streams, specifying maximum size, and using “~” to indicate imprecise deletion

int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
 if (s->length <= maxlen) return 0;

    raxIterator ri;
    raxStart(&ri,s->rax);
    raxSeek(&ri,"^".NULL.0);

    int64_t deleted = 0;
Copy the code

Start iterating from the first node of the RAX tree

while(s->length > maxlen && raxNext(&ri)) {
        unsigned char *lp = ri.data, *p = lpFirst(lp);
        int64_t entries = lpGetInteger(p);

        // Delete whether the entire node is still greater than maxLen
        if (s->length - entries >= maxlen) {
            lpFree(lp);
            raxRemove(s->rax,ri.key,ri.key_len,NULL);
            raxSeek(&ri,"> =",ri.key,ri.key_len);
            s->length -= entries;
            deleted += entries;
            continue;
        }

        // The entire node is interrupted if it cannot be deleted
        if (approx) break;
Copy the code

The while condition is that the length of stream must be less than the specified length. It can be seen from this that “~” is used, indicating that only the whole node is deleted each time. After deleting the whole node, strema must be greater than or equal to maxLen, indicating that the length of strema after deletion is greater than or equal to maxLen

// Calculate how much to delete
        int64_t to_delete = s->length - maxlen;
        serverAssert(to_delete < entries);
        // Modify information about undeleted nodes and deleted nodes
        lp = lpReplaceInteger(lp,&p,entries-to_delete);
        p = lpNext(lp,p);
        int64_t marked_deleted = lpGetInteger(p);
        lp = lpReplaceInteger(lp,&p,marked_deleted+to_delete);
        p = lpNext(lp,p);

        / / skip masterEntry
        int64_t master_fields_count = lpGetInteger(p);
        p = lpNext(lp,p);
        for (int64_t j = 0; j < master_fields_count; j++)
            p = lpNext(lp,p);
        p = lpNext(lp,p);
Copy the code

Calculate the number of entries that can be deleted from the current node, skipping masterEntry on the Listpack

while(p) {
            int flags = lpGetInteger(p);
            int to_skip;

            // Mark undeleted nodes as deleted
            if(! (flags & STREAM_ITEM_FLAG_DELETED)) { flags |= STREAM_ITEM_FLAG_DELETED; lp = lpReplaceInteger(lp,&p,flags); deleted++; s->length--;if (s->length <= maxlen) break; /* Enough entries deleted. */
            }

            // Skip ms seq and num-field nodes
            p = lpNext(lp,p);
            p = lpNext(lp,p);
            p = lpNext(lp,p);
            if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
                to_skip = master_fields_count;
            } else {
                to_skip = lpGetInteger(p);
                to_skip = 1+(to_skip*2);
            }

            while(to_skip--) p = lpNext(lp,p);
            p = lpNext(lp,p);
        }

        // Too much garbage needs to be cleaned up but this is not done here
        entries -= to_delete;
        marked_deleted += to_delete;
        if (entries + marked_deleted > 10 && marked_deleted > entries/2) {
            / *TODO: perform a garbage collection. */
        }

        raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);

        break;
Copy the code

It then loops through the Listpack for deletion, where the deletion is a flag deletion, marking the node’s flag as deleted. When the tag removes too many nodes, it needs to be recycled, but ++ is not implemented here

Finally, re-insert the Listpack into the stream

conclusion

This article analyzes XCLAIM, XACK, XTRIM command source code, it can be seen that in the implementation of raX and Listpack data structure, the implementation of these functions are very simple, these simple implementation together constitute a message queue similar to Kafka