Command processing

After Redis Server establishes a connection with a client, it registers readable events — client command requests — in the event-driven framework. There are four stages of command processing:

  • Command readsCorresponds to the readQueryFromClient function
  • The command parsing: corresponds to processInputBuffer function
  • Command execution: corresponds to processCommand function
  • Results back: corresponds to the addReply function

Command reads

The readQueryFromClient function was analyzed in the previous article, and the main flow is:

  1. The connRead function is called to read the command
  2. Appends the command to the synchronization buffer to modify the synchronization offset
  3. Call processInputBuffer for command parsing
void readQueryFromClient(connection *conn) {
    // Get the client from the connection structureclient *c = connGetPrivateData(conn); ... nread = connRead(c->conn, c->querybuf+qblen, readlen); .../* There is more data in the client input buffer, continue parsing it * in case to check if there is a full command to execute. */
     processInputBuffer(c);
}
Copy the code

The command parsing

ProcessInputBuffer function will be called processCommandAndResetClient function, which will call processCommand function again.

void processInputBuffer(client *c) {

    while(c->qb_pos < sdslen(c->querybuf)) {// Determine the command type based on the command start character in the client input buffer
        if(! c->reqtype) {// The command complies with the RESP protocol
            if (c->querybuf[c->qb_pos] == The '*') {
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else {
                // Pipe type commandc->reqtype = PROTO_REQ_INLINE; }}// For pipe type commands, call processInlineBuffer to parse
        if (c->reqtype == PROTO_REQ_INLINE) {
            if(processInlineBuffer(c) ! = C_OK)break; ...// For RESP commands, call processMultibulkBuffer to parse
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if(processMultibulkBuffer(c) ! = C_OK)break; }...if (c->argc == 0) {
            resetClient(c);
        } else{...// Start executing commands
            if (processCommandAndResetClient(c) == C_ERR) {
                return; }}}... }Copy the code
int processCommandAndResetClient(client *c) {
    int deadclient = 0;
    client *old_client = server.current_client;
    server.current_client = c;
    if (processCommand(c) == C_OK) {
        commandProcessed(c);
    }
    if (server.current_client == NULL) deadclient = 1;
    /* * Restore the old client, this is needed because when a script * times out, we will get into this code from processEventsWhileBlocked. * Which will cause to set the server.current_client. If not restored * we will return 1 to our caller which will falsely indicate the client * is dead and will stop reading from its buffer. */
    server.current_client = old_client;
    /* performEvictions may flush slave output buffers. This may * result in a slave, that may be the active client, to be * freed. */
    return deadclient ? C_ERR : C_OK;
}
Copy the code

Command execution

The processCommand function is implemented in the server.c file:

  • Call the moduleCallCommandFilters function to replace the Redis command with the command the Module wants to replace
  • Check whether the current command is quit and handle the problem accordingly
  • Call the lookupCommand function to look for related commands in the Commands member variable of the global variable server

Commands is a hash table:

struct redisServer {. dict *commands; . }Copy the code

It is initialized in the initServerConfig function:

void initServerConfig(void) {... server.commands = dictCreate(&commandTableDictType,NULL); . populateCommandTable(); . }Copy the code

The populateCommandTable function uses the redisCommandTable array:

void populateCommandTable(void) {
    int j;
    int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);

    for (j = 0; j < numcommands; j++) {
        struct redisCommand *c = redisCommandTable+j;
        int retval1, retval2;

        /* Translate the command string flags description into an actual * set of flags. */
        if (populateCommandTableParseFlags(c,c->sflags) == C_ERR)
            serverPanic("Unsupported command flag");

        c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */
        retval1 = dictAdd(server.commands, sdsnew(c->name), c);
        /* Populate an additional dictionary that will be unaffected * by rename-command statements in redis.conf. */retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c); serverAssert(retval1 == DICT_OK && retval2 == DICT_OK); }}Copy the code

The redisCommandTable array is defined in server.c and records the implementation function corresponding to the current command. For details, see github.com/LjyYano/red…

struct redisCommand redisCommandTable[] ={{"module",moduleCommand,2 -."admin no-script".0.NULL.0.0.0.0.0.0},

    {"get",getCommand,2."read-only fast @string".0.NULL.1.1.1.0.0.0},

    {"getex",getexCommand,2 -."write fast @string".0.NULL.1.1.1.0.0.0},

     ……
};
Copy the code

Its redisCommand structure is as follows:

struct redisCommand {
    char *name;
    redisCommandProc *proc;
    int arity;
    char *sflags;   /* Flags as string representation, one char per flag. */
    uint64_t flags; /* The actual flags, obtained from the 'sflags' field. */
    /* Use a function to determine keys arguments in a command line. * Used for Redis Cluster redirect. */
    redisGetKeysProc *getkeys_proc;
    /* What keys should be loaded in background when calling this command? * /
    int firstkey; /* The first argument that's a key (0 = no keys) */
    int lastkey;  /* The last argument that's a key */
    int keystep;  /* The step between first and last key */
    long long microseconds, calls, rejected_calls, failed_calls;
    int id;     /* Command ID. This is a progressive ID starting from 0 that is assigned at runtime, and is used in order to check ACLs. A connection is able to execute a given command if the user associated to the connection has this command bit set in the bitmap of allowed commands. */
};
Copy the code

Back to processCommand, check whether the current client has a CLIENT_MULTI flag. If so, it indicates that it is processing commands related to a Redis transaction, so it calls queueMultiCommand to queue the commands as required by the transaction. Wait for subsequent processing together. If not, the processCommand function will call the call function to actually execute the command.

if(c->flags & CLIENT_MULTI && c->cmd->proc ! = execCommand && c->cmd->proc ! = discardCommand && c->cmd->proc ! = multiCommand && c->cmd->proc ! = watchCommand && c->cmd->proc ! = resetCommand) {// Add the command to the queue and save it for subsequent processing
    queueMultiCommand(c);
    addReply(c,shared.queued);
} else {
    // Call the call function to execute the commandcall(c,CMD_CALL_FULL); ... }Copy the code

Take the simplest get command as an example:

{"get",getCommand,2."read-only fast @string".0.NULL.1.1.1.0.0.0},
Copy the code

The corresponding implementation function is getCommand, which calls getGenericCommand:

void getCommand(client *c) {
    getGenericCommand(c);
}

int getGenericCommand(client *c) {
    robj *o;

    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL)
        return C_OK;

    if (checkType(c,o,OBJ_STRING)) {
        return C_ERR;
    }

    addReplyBulk(c,o);
    return C_OK;
}
Copy the code

This will eventually call the lookupKeyReadWithFlags function in the db.c file:

robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
    robj *val;

    if (expireIfNeeded(db,key) == 1) {
        /* If we are in the context of a master, expireIfNeeded() returns 1 * when the key is no longer valid, so we can return NULL ASAP. */
        if (server.masterhost == NULL)
            goto keymiss;

        /* However if we are in the context of a slave, expireIfNeeded() will * not really try to expire the key, it only returns information * about the "logical" status of the key: key expiring is up to the * master in order to have a consistent view of master's data set. * * However, if the command caller is not the master, and as additional * safety measure, the command invoked is a read-only command, we can * safely return NULL here, and provide a more consistent behavior * to clients accessing expired values in a read-only fashion, that * will say the key as non existing. * * Notably this covers GETs when slaves are used to scale reads. */
        if(server.current_client && server.current_client ! = server.master && server.current_client->cmd && server.current_client->cmd->flags & CMD_READONLY) {goto keymiss;
        }
    }
    val = lookupKey(db,key,flags);
    if (val == NULL)
        goto keymiss;
    server.stat_keyspace_hits++;
    return val;

keymiss:
    if(! (flags & LOOKUP_NONOTIFY)) { notifyKeyspaceEvent(NOTIFY_KEY_MISS,"keymiss", key, db->id);
    }
    server.stat_keyspace_misses++;
    return NULL;
}
Copy the code

The lookupKey function is called:

robj *lookupKey(redisDb *db, robj *key, int flags) {
    dictEntry *de = dictFind(db->dict,key->ptr);
    if (de) {
        robj *val = dictGetVal(de);

        /* Update the access time for the ageing algorithm. * Don't do it if we have a saving child, as this will trigger * a copy on write madness. */
        if(! hasActiveChildProcess() && ! (flags & LOOKUP_NOTOUCH)){if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
                updateLFU(val);
            } else{ val->lru = LRU_CLOCK(); }}return val;
    } else {
        return NULL; }}Copy the code

Results back

PrepareClientToWrite function called addReply function, mainly, and then call to clientInstallWriteHandler function, Add the write-back client to the clients_pending_write list of the global variable Server. Finally, the _addReplyToBuffer function is called to add the result returned to the client’s output buffer.

/* Add the object 'obj' string representation to the client output buffer. */
void addReply(client *c, robj *obj) {
    if(prepareClientToWrite(c) ! = C_OK)return;

    if (sdsEncodedObject(obj)) {
        if(_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) ! = C_OK) _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr)); }else if (obj->encoding == OBJ_ENCODING_INT) {
        /* For integer encoded strings we just convert it into a string * using our optimized function, and attach the resulting string * to the output buffer. */
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        if(_addReplyToBuffer(c,buf,len) ! = C_OK) _addReplyProtoToList(c,buf,len); }else {
        serverPanic("Wrong obj->encoding in addReply()"); }}Copy the code

Refer to the link

  • Geek time: 14 | look from the code implements a distributed lock atomicity guarantee
  • Distributed locks with Redis

Redis source code concise analysis series

The most concise Redis source code analysis series of articles

Java programming ideas – the most complete mind map -GitHub download link, need partners can take ~

Original is not easy, I hope you reprint when please contact me, and mark the original link.