Command processing
After Redis Server establishes a connection with a client, it registers readable events — client command requests — in the event-driven framework. There are four stages of command processing:
Command reads
Corresponds to the readQueryFromClient functionThe command parsing
: corresponds to processInputBuffer functionCommand execution
: corresponds to processCommand functionResults back
: corresponds to the addReply function
Command reads
The readQueryFromClient function was analyzed in the previous article, and the main flow is:
- The connRead function is called to read the command
- Appends the command to the synchronization buffer to modify the synchronization offset
- Call processInputBuffer for command parsing
void readQueryFromClient(connection *conn) {
// Get the client from the connection structureclient *c = connGetPrivateData(conn); ... nread = connRead(c->conn, c->querybuf+qblen, readlen); .../* There is more data in the client input buffer, continue parsing it * in case to check if there is a full command to execute. */
processInputBuffer(c);
}
Copy the code
The command parsing
ProcessInputBuffer function will be called processCommandAndResetClient function, which will call processCommand function again.
void processInputBuffer(client *c) {
while(c->qb_pos < sdslen(c->querybuf)) {// Determine the command type based on the command start character in the client input buffer
if(! c->reqtype) {// The command complies with the RESP protocol
if (c->querybuf[c->qb_pos] == The '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
// Pipe type commandc->reqtype = PROTO_REQ_INLINE; }}// For pipe type commands, call processInlineBuffer to parse
if (c->reqtype == PROTO_REQ_INLINE) {
if(processInlineBuffer(c) ! = C_OK)break; ...// For RESP commands, call processMultibulkBuffer to parse
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if(processMultibulkBuffer(c) ! = C_OK)break; }...if (c->argc == 0) {
resetClient(c);
} else{...// Start executing commands
if (processCommandAndResetClient(c) == C_ERR) {
return; }}}... }Copy the code
int processCommandAndResetClient(client *c) {
int deadclient = 0;
client *old_client = server.current_client;
server.current_client = c;
if (processCommand(c) == C_OK) {
commandProcessed(c);
}
if (server.current_client == NULL) deadclient = 1;
/* * Restore the old client, this is needed because when a script * times out, we will get into this code from processEventsWhileBlocked. * Which will cause to set the server.current_client. If not restored * we will return 1 to our caller which will falsely indicate the client * is dead and will stop reading from its buffer. */
server.current_client = old_client;
/* performEvictions may flush slave output buffers. This may * result in a slave, that may be the active client, to be * freed. */
return deadclient ? C_ERR : C_OK;
}
Copy the code
Command execution
The processCommand function is implemented in the server.c file:
- Call the moduleCallCommandFilters function to replace the Redis command with the command the Module wants to replace
- Check whether the current command is quit and handle the problem accordingly
- Call the lookupCommand function to look for related commands in the Commands member variable of the global variable server
Commands is a hash table:
struct redisServer {. dict *commands; . }Copy the code
It is initialized in the initServerConfig function:
void initServerConfig(void) {... server.commands = dictCreate(&commandTableDictType,NULL); . populateCommandTable(); . }Copy the code
The populateCommandTable function uses the redisCommandTable array:
void populateCommandTable(void) {
int j;
int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
for (j = 0; j < numcommands; j++) {
struct redisCommand *c = redisCommandTable+j;
int retval1, retval2;
/* Translate the command string flags description into an actual * set of flags. */
if (populateCommandTableParseFlags(c,c->sflags) == C_ERR)
serverPanic("Unsupported command flag");
c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */
retval1 = dictAdd(server.commands, sdsnew(c->name), c);
/* Populate an additional dictionary that will be unaffected * by rename-command statements in redis.conf. */retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c); serverAssert(retval1 == DICT_OK && retval2 == DICT_OK); }}Copy the code
The redisCommandTable array is defined in server.c and records the implementation function corresponding to the current command. For details, see github.com/LjyYano/red…
struct redisCommand redisCommandTable[] ={{"module",moduleCommand,2 -."admin no-script".0.NULL.0.0.0.0.0.0},
{"get",getCommand,2."read-only fast @string".0.NULL.1.1.1.0.0.0},
{"getex",getexCommand,2 -."write fast @string".0.NULL.1.1.1.0.0.0},
……
};
Copy the code
Its redisCommand structure is as follows:
struct redisCommand {
char *name;
redisCommandProc *proc;
int arity;
char *sflags; /* Flags as string representation, one char per flag. */
uint64_t flags; /* The actual flags, obtained from the 'sflags' field. */
/* Use a function to determine keys arguments in a command line. * Used for Redis Cluster redirect. */
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? * /
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey; /* The last argument that's a key */
int keystep; /* The step between first and last key */
long long microseconds, calls, rejected_calls, failed_calls;
int id; /* Command ID. This is a progressive ID starting from 0 that is assigned at runtime, and is used in order to check ACLs. A connection is able to execute a given command if the user associated to the connection has this command bit set in the bitmap of allowed commands. */
};
Copy the code
Back to processCommand, check whether the current client has a CLIENT_MULTI flag. If so, it indicates that it is processing commands related to a Redis transaction, so it calls queueMultiCommand to queue the commands as required by the transaction. Wait for subsequent processing together. If not, the processCommand function will call the call function to actually execute the command.
if(c->flags & CLIENT_MULTI && c->cmd->proc ! = execCommand && c->cmd->proc ! = discardCommand && c->cmd->proc ! = multiCommand && c->cmd->proc ! = watchCommand && c->cmd->proc ! = resetCommand) {// Add the command to the queue and save it for subsequent processing
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
// Call the call function to execute the commandcall(c,CMD_CALL_FULL); ... }Copy the code
Take the simplest get command as an example:
{"get",getCommand,2."read-only fast @string".0.NULL.1.1.1.0.0.0},
Copy the code
The corresponding implementation function is getCommand, which calls getGenericCommand:
void getCommand(client *c) {
getGenericCommand(c);
}
int getGenericCommand(client *c) {
robj *o;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL)
return C_OK;
if (checkType(c,o,OBJ_STRING)) {
return C_ERR;
}
addReplyBulk(c,o);
return C_OK;
}
Copy the code
This will eventually call the lookupKeyReadWithFlags function in the db.c file:
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
robj *val;
if (expireIfNeeded(db,key) == 1) {
/* If we are in the context of a master, expireIfNeeded() returns 1 * when the key is no longer valid, so we can return NULL ASAP. */
if (server.masterhost == NULL)
goto keymiss;
/* However if we are in the context of a slave, expireIfNeeded() will * not really try to expire the key, it only returns information * about the "logical" status of the key: key expiring is up to the * master in order to have a consistent view of master's data set. * * However, if the command caller is not the master, and as additional * safety measure, the command invoked is a read-only command, we can * safely return NULL here, and provide a more consistent behavior * to clients accessing expired values in a read-only fashion, that * will say the key as non existing. * * Notably this covers GETs when slaves are used to scale reads. */
if(server.current_client && server.current_client ! = server.master && server.current_client->cmd && server.current_client->cmd->flags & CMD_READONLY) {goto keymiss;
}
}
val = lookupKey(db,key,flags);
if (val == NULL)
goto keymiss;
server.stat_keyspace_hits++;
return val;
keymiss:
if(! (flags & LOOKUP_NONOTIFY)) { notifyKeyspaceEvent(NOTIFY_KEY_MISS,"keymiss", key, db->id);
}
server.stat_keyspace_misses++;
return NULL;
}
Copy the code
The lookupKey function is called:
robj *lookupKey(redisDb *db, robj *key, int flags) {
dictEntry *de = dictFind(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);
/* Update the access time for the ageing algorithm. * Don't do it if we have a saving child, as this will trigger * a copy on write madness. */
if(! hasActiveChildProcess() && ! (flags & LOOKUP_NOTOUCH)){if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
} else{ val->lru = LRU_CLOCK(); }}return val;
} else {
return NULL; }}Copy the code
Results back
PrepareClientToWrite function called addReply function, mainly, and then call to clientInstallWriteHandler function, Add the write-back client to the clients_pending_write list of the global variable Server. Finally, the _addReplyToBuffer function is called to add the result returned to the client’s output buffer.
/* Add the object 'obj' string representation to the client output buffer. */
void addReply(client *c, robj *obj) {
if(prepareClientToWrite(c) ! = C_OK)return;
if (sdsEncodedObject(obj)) {
if(_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) ! = C_OK) _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr)); }else if (obj->encoding == OBJ_ENCODING_INT) {
/* For integer encoded strings we just convert it into a string * using our optimized function, and attach the resulting string * to the output buffer. */
char buf[32];
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
if(_addReplyToBuffer(c,buf,len) ! = C_OK) _addReplyProtoToList(c,buf,len); }else {
serverPanic("Wrong obj->encoding in addReply()"); }}Copy the code
Refer to the link
- Geek time: 14 | look from the code implements a distributed lock atomicity guarantee
- Distributed locks with Redis
Redis source code concise analysis series
The most concise Redis source code analysis series of articles
Java programming ideas – the most complete mind map -GitHub download link, need partners can take ~
Original is not easy, I hope you reprint when please contact me, and mark the original link.