preface

Stream is a message queue feature introduced in Redis5. According to redis authors, Stream is heavily borrowed from Kafka’s implementation, so kafka-like features can be used in Redis as well. Stream design exactly how, into the source code to find out.

The body of the

Introduction to the

xadd mystream * testKey1 testVal1 testKey2 testVal2
Copy the code

This is the stream appending command, which consists of the following:

  • Mystream: is the name of stream
  • * : specifies the id
  • Testkey-val: field and value corresponding to the message

You can see that a message can have multiple field key-value pairs, and you can specify no consumer group

The internal message id of a stream is strictly incremented, and you can create consumer groups. A stream can have multiple consumer groups.

  • Consumer groups are uniquely identified by group names, and each consumer can consume the entire message of a stream.
  • The consumer in the consumer group iscompetitionRelationship, the same message can only be consumed by one consumer.
  • The consumer needs to confirm the message after consuming it.

The underlying structure

The underlying structure of stream consists mainly of RAX and Listpack

rax

Rax Radix Tree (part 1)

Rax Cardinality Tree

Rax is detailed in the previous article

listpack

Listpack is a value node in a RAx tree that stores strings or integers as follows:

  • TotalBytes is the size of the Listpack table space, occupying 4 bytes
  • Num is the number of stored entries. Similar to the compressed list, the maximum value can be 65535. If the number of entries exceeds 65535, statistics need to be traversed
  • Entry is the actual data node:
    • Encode method
    • Content Actual data
    • Backlen indicates the length, which is used for traversal from back to front
  • End Indicates the end flag, 0xFF
unsigned char *lpNew(void) {
    unsigned char *lp = lp_malloc(LP_HDR_SIZE+1);
    if (lp == NULL) return NULL;
    lpSetTotalBytes(lp,LP_HDR_SIZE+1);
    lpSetNumElements(lp,0);
    lp[LP_HDR_SIZE] = LP_EOF;
    return lp;
}

#define LP_HDR_SIZE 6
Copy the code

Create an empty listpack using the lpNew method, with 32 bits of totalBytes plus 16 bits of num. Then set the trailing flag

//ele: insert new element size: insert length P: insert position WHERE: three modes (front insert, back insert, replace) newp (used to return the next element of the insert element)
unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp) {
    unsigned char intenc[LP_MAX_INT_ENCODING_LEN];
    unsigned char backlen[LP_MAX_BACKLEN_SIZE];

    uint64_t enclen; / / encode length

    // If the element to be inserted is null, replace (delete)
    if (ele == NULL) where = LP_REPLACE;

    // If it is post-insert, skip to pre-insert of next node instead
    // The subsequent inserts will be the same
    if (where == LP_AFTER) {
        p = lpSkip(p);
        where = LP_BEFORE;
    }

    // Store the offset of p before insertion
    unsigned long poff = p-lp;

    // Get the encType of ele
    int enctype;
    if (ele) {
        enctype = lpEncodeGetType(ele,size,intenc,&enclen);
    } else {
        enctype = - 1;
        enclen = 0;
    }

    // Get backLen to traverse backwards
    unsigned long backlen_size = ele ? lpEncodeBacklen(backlen,enclen) : 0;
    uint64_t old_listpack_bytes = lpGetTotalBytes(lp);
    uint32_t replaced_len  = 0;
    if (where == LP_REPLACE) {
        replaced_len = lpCurrentEncodedSize(p);
        replaced_len += lpEncodeBacklen(NULL,replaced_len);
    }

    uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size
                                  - replaced_len;
    if (new_listpack_bytes > UINT32_MAX) return NULL;
    
    unsigned char *dst = lp + poff; /* May be updated after reallocation. */

    /* Need more space */
    if (new_listpack_bytes > old_listpack_bytes) {
        if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
        dst = lp + poff;
    }

    // Move the element
    if (where == LP_BEFORE) {
        memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);
    } else { /* LP_REPLACE. */
        long lendiff = (enclen+backlen_size)-replaced_len;
        memmove(dst+replaced_len+lendiff,
                dst+replaced_len,
                old_listpack_bytes-poff-replaced_len);
    }

    /* Free up space */
    if (new_listpack_bytes < old_listpack_bytes) {
        if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
        dst = lp + poff;
    }

    /* Save the next */
    if (newp) {
        *newp = dst;
        /* In case of deletion, set 'newp' to NULL if the next element is * the EOF element. */
        if(! ele && dst[0] == LP_EOF) *newp = NULL;
    }
    if (ele) {
        if (enctype == LP_ENCODING_INT) {
            memcpy(dst,intenc,enclen);
        } else {
            lpEncodeString(dst,ele,size);
        }
        dst += enclen;
        memcpy(dst,backlen,backlen_size);
        dst += backlen_size;
    }

    /* Change the header information */
    if(where ! = LP_REPLACE || ele ==NULL) {
        uint32_t num_elements = lpGetNumElements(lp);
        if(num_elements ! = LP_HDR_NUMELE_UNKNOWN) {// Add or remove elements
            if (ele)
                lpSetNumElements(lp,num_elements+1);
            else
                lpSetNumElements(lp,num_elements- 1);
        }
    }
    lpSetTotalBytes(lp,new_listpack_bytes);

#if 0
    unsigned char *oldlp = lp;
    lp = lp_malloc(new_listpack_bytes);
    memcpy(lp,oldlp,new_listpack_bytes);
    if (newp) {
        unsigned long offset = (*newp)-oldlp;
        *newp = lp + offset;
    }
    /* Make sure the old allocation contains garbage. */
    memset(oldlp,'A',new_listpack_bytes);
    lp_free(oldlp);
#endif

    return lp;
}
Copy the code

Listpack insert code, see the comments, the general flow is as follows:

  1. Change the forward insert to post insert, unify the code
  2. Calculates the space needed to insert or replace elements and reallocates the entire ListPack memory
  3. Reposition old elements to make room for new ones
  4. Release the old Listpack and insert or replace it in the new Listpack
  5. Modifying Header Information
unsigned char *lpFirst(unsigned char *lp) {
    lp += LP_HDR_SIZE; /* Skip the header. */
    if (lp[0] == LP_EOF) return NULL;
    return lp;
}
Copy the code

Gets a pointer to the first element, which is just the next element to the header

unsigned char *lpPrev(unsigned char *lp, unsigned char *p) {
    if (p-lp == LP_HDR_SIZE) return NULL;
    p--; 
    uint64_t prevlen = lpDecodeBacklen(p);
    Prevlen (prevlen, prevlen, prevlen, prevlen)
    prevlen += lpEncodeBacklen(NULL,prevlen);
    return p-prevlen+1;
}
Copy the code

Get when an element pointer, just get the Backlen property to iterate backwards

Listpack supports either next or PERV traversal, but the rest of the methods won’t be covered here

The stream parsing

typedef struct stream {
    rax *rax;
    uint64_t length;
    streamID last_id;
    rax *cgroups;           /* Consumer group */
} stream;
Copy the code

The stream is stored by a RAx cardinality tree, last_id for the current latest ID, and another RAx tree for the current stream’s consumer group.

typedef struct streamID {
    uint64_t ms;        
    uint64_t seq;      
} streamID;
Copy the code

StreamID consists of milliseconds and self-increasing sequence numbers

void streamIncrID(streamID *id) {
    if (id->seq == UINT64_MAX) {
        if (id->ms == UINT64_MAX) {
            id->ms = id->seq = 0;
        } else {
            id->ms++;
            id->seq = 0; }}else{ id->seq++; }}Copy the code

The maximum value of SEQ and MS are expressed in sectors. If the value exceeds the maximum value, set it to 0


The message format

Xadd = listPack = listPack = listPack = listPack = listPack = listPack = listPack

  • Listpack has a masterEntry node, which is used to store information about the fields used to create listPack (for example, the first time XAdd stores two fields in masterEntry). This is because the messages in a stream are mostly similar. In this way, subsequent insert messages do not need to store field information
  • A Listpack can store multiple messages
  • A message can occupy multiple Listpack entries

The following masterEntry is introduced

MasterEntry is a node in the Listpack. It is equivalent to a metadata node. The meanings of the nodes are as follows:

  • Count: Number of undeleted messages in the Listpack
  • Deleted: Indicates the number of deleted messages in the Listpack
  • Num -field: indicates the number of fields
  • Field1-fieldn: Field inserted when the Listpack was first created
  • 0: indicates the end flag

As in the previous command, the testKey1 and testKey2 fields and value are inserted, which are the fields inside masterEntry

++ If an information message is inserted later, but the message field is the same as before, the Listpack entry looks like this: ++

Field information is not stored in entry, which achieves the purpose of saving space

Lp-count indicates the number of ListPacks that this object occupies 3+N

If the filed of the message is different, the storage mode will change

At this point, the new field information will be next to the value

consumers

typedef struct streamCG {
    streamID last_id;      
    rax *pel;               
    rax *consumers;
} streamCG;
Copy the code

The consumer group uses two RAx trees to hold unconfirmed messages and consumers, with last_id indicating the most recent id confirmed

typedef struct streamConsumer {
    mstime_t seen_time;
    sds name;            
    rax *pel;                   
} streamConsumer;
Copy the code

Seen_time is the last active time, and internally raX is also used to represent the current consumer unconfirmed message

typedef struct streamNACK {
    mstime_t delivery_time;     /* Last time to send */
    uint64_t delivery_count;    /* The number of times the message was sent */
    streamConsumer *consumer;   /* Belongs to the consumer */
} streamNACK;
Copy the code

Unacknowledged message structure, the PEL within the consumer group is shared with the PEL within the consumer group. When a consumer consumes a message through the consumer group, a streamNACK is created and placed in both Pels

The source code parsing

Follow XADD as shown at the beginning to parse the source code

XADD key [MAXLEN [~|=] ] <ID or *> [field value] [field value] …

XADD supports MAXLEN to specify the length of the stream, and can use ~ to optimize the length

void xaddCommand(client *c) {
    streamID id;
    // If id id_given=1 is manually set
    int id_given = 0;
    long long maxlen = - 1;  /* 是否设置maxLen */
    int approx_maxlen = 0;  /* MAXLEN parameter ~ */
    int maxlen_arg_idx = 0; /* idx*/ for maxlen

    int i = 2; 
    for (; i < c->argc; i++) {
        // How many key-val arguments follow
        int moreargs = (c->argc- 1) - i;
        char *opt = c->argv[i]->ptr;
        / / * on the id
        if (opt[0] = =The '*' && opt[1] = ='\ 0') {
            /* This is just a fast path for the common case of auto-ID * creation. */
            break;
        } else if(! strcasecmp(opt,"maxlen") && moreargs) {
            approx_maxlen = 0;
            char *next = c->argv[i+1]->ptr;
            // parse manlen ~
            if (moreargs >= 2 && next[0] = ='~' && next[1] = ='\ 0') {
                approx_maxlen = 1;
                i++;
            } else if (moreargs >= 2 && next[0] = ='=' && next[1] = ='\ 0') {
                i++;
            }
            if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
                != C_OK) return;

            if (maxlen < 0) {
                addReplyError(c,"The MAXLEN argument must be >= 0.");
                return;
            }
            i++;
            maxlen_arg_idx = i;
        } else {
            // Manually set the ID
            if (streamParseStrictIDOrReply(c,c->argv[i],&id,0) != C_OK) return;
            id_given = 1;
            break; }}/ / key - val index
    int field_pos = i+1;

    // Key val must be paired
    if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) = =1) {
        addReplyError(c,"wrong number of arguments for XADD");
        return;
    }

    // The minimum ID cannot be set
    if (id_given && id.ms == 0 && id.seq == 0) {
        addReplyError(c,"The ID specified in XADD must be greater than 0-0");
        return;
    }
Copy the code

This section of code is the parsing and validation of the parameters, see the comment

robj *o;
    stream *s;
    // Find or create a streamname-stream object in db
    if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1)) = =NULL) return;
    s = o->ptr;

    // The capacity is exceeded
    if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) {
        addReplyError(c,"The stream has exhausted the last possible ID, "
                        "unable to add more items");
        return;
    }
    
    if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
        &id, id_given ? &id : NULL)
        == C_ERR)
    {
        addReplyError(c,"The ID specified in XADD is equal or smaller than the "
                        "target stream top item");
        return;
    }
    // Send the id information to the client
    addReplyStreamID(c,&id);
Copy the code

First call streamTypeLookupWriteOrCreate find or create a stream, to verify whether the stream last_id beyond the maximum, then call streamAppendItem added

robj *streamTypeLookupWriteOrCreate(client *c, robj *key) {
    robj *o = lookupKeyWrite(c->db,key);
    if (o == NULL) {
        o = createStreamObject();
        dbAdd(c->db,key,o);
    } else {
        if(o->type ! = OBJ_STREAM) { addReply(c,shared.wrongtypeerr);return NULL; }}return o;
}
Copy the code

The stream is stored in db like a regular key-val. Create an empty stream by calling createStreamObject. The streamAppendItem insert process is inserted like the ListPack analyzed above

int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) {
    
    // Add or set the id
    streamID id;
    if (use_id)
        id = *use_id;
    else
        streamNextID(&s->last_id,&id);

    // The increment id is checked in a loop and returns an error
    if (streamCompareID(&id,&s->last_id) <= 0) return C_ERR;

    raxIterator ri;
    raxStart(&ri,s->rax);
    raxSeek(&ri,"$".NULL.0);

    size_t lp_bytes = 0;
    unsigned char *lp = NULL;

    if (raxNext(&ri)) {
        lp = ri.data;
        lp_bytes = lpBytes(lp);
    }
    raxStop(&ri);

    uint64_t rax_key[2];
    streamID master_id;
    // Check the number and length of listPack nodes
    if(lp ! =NULL) {
        if (server.stream_node_max_bytes &&
            lp_bytes >= server.stream_node_max_bytes)
        {
            lp = NULL;
        } else if (server.stream_node_max_entries) {
            int64_t count = lpGetInteger(lpFirst(lp));
            if (count >= server.stream_node_max_entries) lp = NULL; }}Copy the code

First determine if you need to increment the ID, and then get the Listpack, because the ID is always incrementing, so the raxSeek method passes “$” to indicate getting the last node

f (lp == NULL || lp_bytes >= server.stream_node_max_bytes) {
        master_id = id;
        // encode the message id into rax_key
        streamEncodeID(rax_key,&id);
        lp = lpNew();
        //count is the number of deleted messages 1
        lp = lpAppendInteger(lp,1);
        //deleted Number of deleted messages 0
        lp = lpAppendInteger(lp,0);
        / / the number field
        lp = lpAppendInteger(lp,numfields);
        for (int64_t i = 0; i < numfields; i++) {
            sds field = argv[i*2]->ptr;
            lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
        }
        // The trailing flag bit is 0
        lp = lpAppendInteger(lp,0);
        raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
        flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
Copy the code

This is the new listpack insert, you can see three consecutive lpAppendInteger, corresponding to the masterEntry format mentioned above, and then insert fields, set to STREAM_ITEM_FLAG_SAMEFIELDS, The fields and masterEntry are the same, and the above optimization can be used

else {
        serverAssert(ri.key_len == sizeof(rax_key));
        memcpy(rax_key,ri.key,sizeof(rax_key));

        // Parse rax_key to master_id
        streamDecodeID(rax_key,&master_id);
        unsigned char *lp_ele = lpFirst(lp);

        // First get count and then add count
        int64_t count = lpGetInteger(lp_ele);
        lp = lpReplaceInteger(lp,&lp_ele,count+1);
        // Skip the deleted segment
        lp_ele = lpNext(lp,lp_ele);
        lp_ele = lpNext(lp,lp_ele);

        // Check to insert the field just inserted
        int64_t master_fields_count = lpGetInteger(lp_ele);
        lp_ele = lpNext(lp,lp_ele);
        if (numfields == master_fields_count) {
            int64_t i;
            // Compare the filed with the tape
            for (i = 0; i < master_fields_count; i++) {
                sds field = argv[i*2]->ptr;
                int64_t e_len;
                unsigned char buf[LP_INTBUF_SIZE];
                unsigned char *e = lpGet(lp_ele,&e_len,buf);
                // Copy data
                if(sdslen(field) ! = (size_t)e_len ||
                    memcmp(e,field,e_len) ! =0) break;
                lp_ele = lpNext(lp,lp_ele);
            }
            // All fields are the same
            if(i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS; }}Copy the code

If listpack already exists, the difference between the field information on masterEntry and the previous insertion is checked, and if it is exactly the same, STREAM_ITEM_FLAG_SAMEFIELDS is set

// Three headers
    lp = lpAppendInteger(lp,flags);
    lp = lpAppendInteger(lp,id.ms - master_id.ms);
    lp = lpAppendInteger(lp,id.seq - master_id.seq);
    // If not, modify numfields
    if(! (flags & STREAM_ITEM_FLAG_SAMEFIELDS)) lp = lpAppendInteger(lp,numfields);for (int64_t i = 0; i < numfields; i++) {
        sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr;
        // Insert field information if different
        if(! (flags & STREAM_ITEM_FLAG_SAMEFIELDS)) lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
        lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
    }
    int64_t lp_count = numfields;
    lp_count += 3; Flags + ms + seq. */
    // If not, add numfields
    if(! (flags & STREAM_ITEM_FLAG_SAMEFIELDS)) { lp_count += numfields+1;
    }
    lp = lpAppendInteger(lp,lp_count);

    // Insert a new rax tree
    if(ri.data ! = lp) raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
    s->length++;
    s->last_id = id;
    if (added_id) *added_id = id;
    return C_OK;
Copy the code

Then there is the insertion of concrete values, with the following characteristics:

  • The ID saved by listPack Entry is the current ID minus the value of MasterEntry-ID
  • If the fields are different, insert the field before the value
  • Finally, add the new lP_count field and reinsert it into raX
    signalModifiedKey(c,c->db,c->argv[1]);
    notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
    server.dirty++;

    if (maxlen >= 0) {
        /* Notify xtrim event if needed. */
        if (streamTrimByLength(s,maxlen,approx_maxlen)) {
            notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
        }
        if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
    }

    // override id aof, etc
    robj *idarg = createObjectFromStreamID(&id);
    rewriteClientCommandArgument(c,i,idarg);
    decrRefCount(idarg);

    /* We need to signal to blocked clients that there is new data on this * stream. */
    if (server.blocked_clients_by_type[BLOCKED_STREAM])
        signalKeyAsReady(c->db, c->argv[1]);
Copy the code

This triggers notifications, calls to the maxlen parameter, and overrides of the increment ID. Since the increment ID generation depends on the current time, the generated ID needs to be saved to modify the command for consistency between master and slave and AOF

Note the signalKeyAsReady method, which will be covered later. ++

int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
    if (s->length <= maxlen) return 0;

    raxIterator ri;
    raxStart(&ri,s->rax);
    raxSeek(&ri,"^".NULL.0);

    int64_t deleted = 0;
    while(s->length > maxlen && raxNext(&ri)) {
        unsigned char *lp = ri.data, *p = lpFirst(lp);
        int64_t entries = lpGetInteger(p);

        // Delete whether the entire node is still greater than maxLen
        if (s->length - entries >= maxlen) {
            lpFree(lp);
            raxRemove(s->rax,ri.key,ri.key_len,NULL);
            raxSeek(&ri,"> =",ri.key,ri.key_len);
            s->length -= entries;
            deleted += entries;
            continue;
        }

        // If the entire node cannot be deleted, it is interrupted
        if (approx) break;

        // Calculate how much to delete
        int64_t to_delete = s->length - maxlen;
        serverAssert(to_delete < entries);
        // Modify information about undeleted nodes and deleted nodes
        lp = lpReplaceInteger(lp,&p,entries-to_delete);
        p = lpNext(lp,p);
        int64_t marked_deleted = lpGetInteger(p);
        lp = lpReplaceInteger(lp,&p,marked_deleted+to_delete);
        p = lpNext(lp,p);

        / / skip masterEntry
        int64_t master_fields_count = lpGetInteger(p);
        p = lpNext(lp,p);
        for (int64_t j = 0; j < master_fields_count; j++)
            p = lpNext(lp,p);
        p = lpNext(lp,p);

        while(p) {
            int flags = lpGetInteger(p);
            int to_skip;

            // Mark undeleted nodes as deleted
            if(! (flags & STREAM_ITEM_FLAG_DELETED)) { flags |= STREAM_ITEM_FLAG_DELETED; lp = lpReplaceInteger(lp,&p,flags); deleted++; s->length--;if (s->length <= maxlen) break; /* Enough entries deleted. */
            }

            // Skip ms seq and num-field nodes
            p = lpNext(lp,p);
            p = lpNext(lp,p);
            p = lpNext(lp,p);
            if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
                to_skip = master_fields_count;
            } else {
                to_skip = lpGetInteger(p);
                to_skip = 1+(to_skip*2);
            }

            while(to_skip--) p = lpNext(lp,p);
            p = lpNext(lp,p);
        }

        // Too much garbage needs to be cleaned up but this is not done here
        entries -= to_delete;
        marked_deleted += to_delete;
        if (entries + marked_deleted > 10 && marked_deleted > entries/2) {
            / *TODO: perform a garbage collection. */
        }

        raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);

        break;
    }

    raxStop(&ri);
    return deleted;
}
Copy the code

This is the code to subtract the length of the stream. The main flow is as follows:

  • Starting with the head node of rax, approx means that only the entire listPack is removed, and if the entire node cannot be removed, the deletion is stopped
  • If an entry is deleted, only the flag is marked as deleted and the count is changed
  • If the tag removes too many nodes, you need to clean up the ListPack, but the code doesn’t do that here

conclusion

Above is the stream structure and internal listpack structure source, according to the picture you can see the information recorded by listpack, in Xadd source code can also be strictly see that every step of the operation is in accordance with the Listpack structure.