Analysis version: ReDIS-5.0.4.

The REdis command processing process can be decomposed into three separate processes (excluding replication and persistence) :

  • Accept connection request process;
  • Receive request data and process request flow, in this process will not send the processing result to the Client, but only write the result data to the response buffer, will be sent by the response request flow;
  • Response request process.

The above three processes are asynchronous and not directly related. What they all have in common is triggered by AE (A simple Event-driven) of REdis. For Linux, it is the packaging of Epoll, for macOS, for Evport, for FreeBSD, for KQueue. For others it is a select wrapper.

C, ae_epoll.c, AE_select. c, ae_evport.c, and ae_kqueue. C are the concrete implementations of EA, as shown in the figure below in terms of object orientation:

As you can see from the figure above, when there is no data, the process blocks at the function aeApiPoll (actually epoll_wait for epoll) and times out directly.

AeApiPoll returns immediately from the blocked state if a connection request comes in, if a connection sends data, or if a response has not yet been sent (the connection becomes writable).

Note that only fd is stuffed into epoll, and no client or aeFileEvent is stuffed into epoll. Therefore, when a connection is activated (for example, when data needs to be received), the aeFileEvent needs to be found by fd, and the client only needs to find the aeFileEvent because the aeFileEvent is assigned to the clientData of the aeFileEvent when it is created.

If the global object server (type redisServer, defined in server.h) maintains a global aeEventLoop, then aeEventLoop maintains an array of AefileEvents, and the array of AefileEvents is subscript fd. Therefore, it is easy to find the corresponding aeFileEvent through fd.

The reason aeFileEvent is not injected directly into epoll is to unify the event driver, for example select is not supported. When the process starts executing initServer, aeCreateEventLoop is called to initialize the array, which is larger than the value specified by the maxClients configuration item (plus 128), taking advantage of fd’s cyclical nature as an operating system kernel resource.

1. Accept the connection request process

Once a connection is accepted, a client object is created for the connection and registered with ePoll, registering the event as EPLLIN (corresponding to AE_READABLE for ea).

Corresponding pseudocode:

Int main() {// "ae" is short for "A simple event-driven" void aeMain() {while (! EventLoop ->stop) {// The response starts from beforesleep, and the unfinished part goes to aeApiPoll. if (eventLoop->beforesleep ! = NULL) eventLoop->beforesleep(eventLoop); // aeProcessEvents handles various events, including: // 1) Accept connection requests, create a client for each connection ProcessTimeEvents int aeProcessEvents() {// aeApiPoll specifies aeApiPoll or select. AcceptTcpHandler (int fd) {anetTcpAccept = accept (fd); AcceptCommonHandler (CFD) {// createClient adds c to server.clients. // Server. clients is a link. client *c = createClient(cfd) { aeCreateFileEvent( server.el,fd,AE_READABLE, readQueryFromClient, C) {// Mask value is AE_READABLE (corresponding to EPOLLIN for epoll), and epoll_ctl is actually called for epoll. aeApiAddEvent(eventLoop,fd,mask); } } } } } } } }Copy the code

2. Receive request data and process the request process

This section calls the corresponding command handler functions, such as setCommand for SET commands and getCommand for GET commands. Command processing functions modify memory data.

The result of the processing is written to the response buffer, but not immediately sent to the client. The result is also written to the AOF buffer if AOF is enabled. And write commands to the replication backlog buffer if enabled or required. Commands are also written to the buffer of Slaves if required.

The request is responded to in a separate process that does not directly send the response to the client.

Corresponding pseudocode

// The response command is not included, and the response and receive processing are separate processes. Int main() // server.c:4003 {void aeMain() // ae.c:496 {while (! EventLoop ->stop) {// The response is sent in beforesleep first. If the response is not finished in beforesleep (for example, if the response is too large), the aeApiPoll triggers the subsequent send. if (eventLoop->beforesleep ! = NULL) eventLoop->beforesleep(eventLoop); Int aeProcessEvents() // ae.c:358 {// aeApiPoll(); // readQueryFromClient is a callback function, // Register when creating client:  // client *createClient(int fd) { // aeCreateFileEvent( // server.el, fd, // AE_READABLE, // readQueryFromClient, c); //} void readQueryFromClient() // networking. C :1494 The size of the data sent by the client cannot exceed the value specified by the configuration item client-query-buffer-limit. // The default size is 1G, enough to cover most scenarios. // If it exceeds the size, you can see the WARNING log: // Closing client that reached Max query buffer length // In practice, the value is usually much less than 1G, so it is possible to reduce the value to increase the protection of REdis. int nread = read(fd, c->querybuf, readlen); Int processCommand(client*) // networking.c:2543 {redisCommand* lookupCommand(name) // struct redisServer {// dict *commands; // Command table // }; // redisCommand can be thought of as a C++ abstraction base, which defines the pure virtual function proc: // typedef void redisCommandProc(client *c); // struct redisCommand { // redisCommandProc *proc; / /}; // Each member of the command table is an implementation of redisCommand. return dictFetchValue(commands,name); } void call(client*,flags) // server.c:2414 {// If the SET command is used, // the setCommand function in t_string.c is called; // If delCommand is used, delCommand is actually called in db.c. redisCommand::proc(client*); // propagate(redisCommand*) // server.c:2315 {// Write data to feedAppendOnlyFile(); Void replicationFeedSlaves(Slaves) // replication. C :173 {// Write data to the replication Backlog buffer, // The backlog buffer is a circular buffer. If it is full, the backlog buffer is overwritten from the beginning. // The size of the backlog buffer is determined by the repl-backlog-size feedReplicationBacklog(); // replication.c:126}}}}}}}}} Clients_pending_write = clients_pending_write = clients_pending_write = clients_pending_write = clients_pending_write = clients_pending_write redisServer server; // Server global state #0 listAddNodeHead (list=0x7fe88bc0f210, Value = 0 x7fe88bc64ec0) at adlist. C: / / 92 was not all orders need WriteHandler, / / call clientInstallWriteHandler so some are not. #1 in clientInstallWriteHandler (c=0x7fe88bc64ec0) at networking.c:185 #2 in prepareClientToWrite (c=0x7fe88bc64ec0) at networking.c:228 #3 in addReplyString (c=0x7fe88bc64ec0, s=0x7ffdfc2e70c0 "$855\r\n", len=6) at networking.c:338 #4 in addReplyLongLongWithPrefix (c=0x7fe88bc64ec0, ll=855, prefix=36 '$') at networking.c:515 #5 in addReplyBulkLen (c=0x7fe88bc64ec0, obj=0x7fe889312840) at networking.c:557 #6 in addReplyBulk (c=0x7fe88bc64ec0, obj=0x7fe889312840) at networking.c:562 #7 in getGenericCommand (c=0x7fe88bc64ec0) at t_string.c:167 #8 in getCommand (c=0x7fe88bc64ec0) at t_string.c:173 #9 in call (c=0x7fe88bc64ec0, flags=15) at server.c:2437 #10 in processCommand (c=0x7fe88bc64ec0) at server.c:2729 #11 in processInputBuffer (c=0x7fe88bc64ec0) at networking.c:1451 #12 in processInputBufferAndReplicate (c=0x7fe88bc64ec0) at networking.c:1486 #13 in readQueryFromClient (el=0x7fe88bc30050, fd=8, privdata=0x7fe88bc64ec0, mask=1) at networking.c:1568 #14 in aeProcessEvents (eventLoop=0x7fe88bc30050, flags=11) at ae.c:443 #15 in aeMain (eventLoop=0x7fe88bc30050) at ae.c:501 #16 in main (argc=2, argv=0x7ffdfc2e75b8) at server.c:4197Copy the code

3. Process of responding to requests

For each command that responds, its response is always first in Beforesleep, but if it fails to complete once, it is given to sendReplyToClient for subsequent asynchronous processing (in the case of ePoll, by registering EPOLLOUT events for ePoll).

Corresponding pseudocode:

// Response and receive processing are separate processes. Int main() {// "ae" is short for "A simple event-driven" void aeMain() {while (! EventLoop ->stop) {// Call eventLoop->beforesleep(eventLoop); // Call beforeSleep in server.c: Void beforeSleep (struct aeEventLoop *) {int handleClientsWithPendingWrites () {/ / REdis receiving and processing / / The command flow sets clients_pending_write, and // clients_pending_write is actually a queue link. / / after when dealing with a command, call clientInstallWriteHandler / / the current client added to the clients_pending_write. // However, some commands do not require a response, so there is no such action. listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { int writeToClient(int fd,client* c) { write(fd,c->buf); AeDeleteFileEvent is called to remove the fd from epoll. if (! clientHasPendingReplies(c)) { aeDeleteFileEvent( server.el, c->fd, AE_WRITABLE); Delete EPOLLOUT from epoll}} // If a writeToClient call is not complete, If (clientHasPendingReplies(c)) {// Set EPOLLOUT int AE_FLAGS = AE_WRITABLE; // Add EPOLLOUT to epoll aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c); }}}} // REdis receives and processes a command flow aeProcessEvents(); }}}Copy the code