The event

aeFileEvent

The events generated by socket multiplexing can be divided into read events and write events

  • Read the event:
    • When a client only connects to the server, but does not send a command to the server, the client’s read event is in wait state
    • When a client sends a command request to the server and the request arrives (the socket can read without blocking), the client’s read event is in the ready state
  • Write event:
    • When the server has command results that need to be returned to the client, but the client has not yet performed a non-blocking write, the write event is in wait state
    • When the server has command results to return to the client, and the client can write without blocking, the write event is ready
    • After the command result is delivered, the client’s association with the write event is removed

Read events are preferentially processed when both read and write events occur

TimeEvent (aeTimeEvent)

Temporal events record events that are to be run at a given point in time, and multiple temporal events are stored in a linked list in the server state

In normal mode, Redis uses only one time event, serverCron. In Benchmark mode, Redis uses only two time events

In Redis, the general operation is implemented by Redis. C /serverCron, which performs the following operations:

  • Update server statistics, such as time, memory usage, and database usage.
  • Clean up stale key-value pairs in the database.
  • Close and clean up clients with failed connections.
  • An AOF or RDB persistence operation was attempted.
  • If the server is the primary node, synchronize the subordinate nodes periodically.
  • If you are in cluster mode, perform periodic synchronization and connection tests on the cluster.

The main process

Initialization Process

  • Create epollfd
  • Create the aeEventLoop object
  • Create ListenFD and bind it to ePollfd

Create an event loop object

main -> initServer -> aeCreateEventLoop 

// server.c
void initServer(void) {
	// Initialize the default properties
    // ...

    // Create an Event loop object
    server.el = aeCreteEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    
    // Initialize time event handler to handle background tasks
    aeCreateTimeEvent(server.el, 1, serverCron, NULL.NULL)    

    // Initialize file event handler to mount listener fd to epollFD to handle TCP connections and Unix socket connections
    aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL)}Copy the code

To ensure safe running, CONFIG_FDSET_INCR = CONFIG_MIN_RESERVED_FDS(32) + 96

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

	// Initialize the event loop property
    eventLoop = zmalloc(sizeof(*eventLoop));
    
    // store all listener events
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    // Store triggered events
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    
    // The eventLoop parameter is initialized
    // ...
    
    // Create ePollfd and bind it to event loop
    if (aeApiCreate(eventLoop) == - 1) goto err;
    
    // ...
}
Copy the code

Eventloop object

typedef struct aeEventLoop {
    int maxfd;   // The largest fd registered
    int setsize; // The number of fd's registered
    long long timeEventNextId;
    aeFileEvent *events; // Registered events
    aeFiredEvent *fired; // The event that is triggered
    aeTimeEvent *timeEventHead;  // Time event header
    // ...
} aeEventLoop;
Copy the code

Create epollfd

main -> initServer -> aeCreateEventLoop -> aeApiCreate

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop) {
    // Allocate memory
    aeApiState *state = zmalloc(sizeof(aeApiState));
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    
    // Create an epoll handle
    state->epfd = epoll_create(1024);
    
    // Bind ePollfd to the Event loop object
    eventLoop->apidata = state;
    return 0;
}
Copy the code

You can use aeEventLoop -> apidata -> state -> epfd to access the ePollFD object

Create listenfd

Initialize Listenfd and bind IP and port

Create a socket

static int anetCreateSocket(char *err, int domain) {
    int s;
    
    // domain specifies the communication protocol family. SOCK_STREAM indicates the TCP connection
    s = socket(domain, SOCK_STREAM, 0)
        
    // ...
    return s;
}
Copy the code

Main ->initServer->listenToPort->anetTcpServer->_anetTcpServer->anetListen The listenToPort loop processes all bound addresses. Using _anetTcpServer for each connection request creates a new socket and eventually calls anetListen to perform address binding and listening.

// anet.c
static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
    // Bind the socket to the specified address
    bind(s,sa,len)
	// Start the listening socket
    listen(s, backlog)
    return ANET_OK;
}
Copy the code

Create file events that handle client connections

Add the FD of the socket listening to the client connection to the ePollFD listening list of the eventLoop and set the callback function to acceptTcpHandler

void initServer(void) {
    // ...
    aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL);
    // ...
}
Copy the code

AeCreateFileEvent performs the process of mounting listenFD and binding the event’s callback function

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{

    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == - 1)
        return AE_ERR;
    
    fe->mask |= mask;
    
    // Bind readable event handlers
    if (mask & AE_READABLE) fe->rfileProc = proc;
    // Bind writable event handlers
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    
    return AE_OK;
}
Copy the code

AeApiAddEvent mounts Listenfd to epoll through the epoll_ctl system call

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0};
    
    EPOLL_CTL_MOD is used if other events of the fd are already being listened for, otherwise EPOLL_CTL_ADD is used
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    // Merge already bound events
    mask |= eventLoop->events[fd].mask;
    // Set the listening event
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    // Add the listening fd to the EPFD
    if (epoll_ctl(state->epfd,op,fd,&ee) == - 1) return - 1;
    return 0;
}
Copy the code

Epoll_ctl parameters:

  • Epfd: epollfd object
  • Op: indicates the operation type
    • EPOLL_CTL_ADD: Registers a new FD to epollFD
    • EPOLL_CTL_MOD: Modifies a registered FD
    • EPOLL_CTL_DEL: Deletes fd from EPFD
  • Event: The Events property of an event is a collection of macros:
    • EPOLLIN: readable (including that the peer SOCKET is normally closed).
    • EPOLLOUT: writable.
    • EPOLLPRI: there is urgent data to read (this should indicate the arrival of out-of-band data);
    • EPOLLERR: an error occurs.
    • EPOLLHUP: hung up.
    • EPOLLET: Set EPOLL to Edge Triggered mode, as opposed to Level Triggered.
    • EPOLLONESHOT: monitors only one event. If you want to continue monitoring the socket after this event, you need to add the socket to the EPOLL queue again

This file event will be triggered in the aeMain loop when a subsequent client requests a connection

Loop processing events

  • Process new client connections
  • Process client requests

In the main loop, aeProcessEvents is called through main -> initServer -> aeMain-> aeProcessEvent to handle events

One Event Processing

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
	// ... 

    // Calculate the IO multiplexing wait event TVP
    if(flags & AE_TIME_EVENTS && ! (flags & AE_DONT_WAIT))EventLoop ->timeEventHead to find the first time event to fire
            shortest = aeSearchNearestTimer(eventLoop);
    if (shortest) {
        // Calculates the time to the nearest time event
        long now_sec, now_ms;
        aeGetTime(&now_sec, &now_ms);
        tvp = &tv;
        long long ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms;

        // Write the time to TVP, or 0 if immediate execution is required
        if (ms > 0) {
            tvp->tv_sec = ms/1000;
            tvp->tv_usec = (ms % 1000) *1000;
        } else {
            tvp->tv_sec = 0;
            tvp->tv_usec = 0;
        }
        // ...

        / / calls epoll_wait
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            / / traverse eventLoop - > fired
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            // Record the number of events triggered by the current fd
            int fired = 0;

            // ...
            
            // Handle readable events
            fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            if((fe->mask & mask & AE_READABLE) && (! fired || fe->wfileProc ! = fe->rfileProc)) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; }// Handle writable events
            if (fe->mask & mask & AE_WRITABLE) {
                if(! fired || fe->wfileProc ! = fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; }}}}// Handle time events
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}
Copy the code

In Linux, aeApiPoll actually invokes epoll_wait. The timeout of epoll_wait is indicated by the TVP parameter. Therefore, if TVP is set to 0 when a timed event needs to be executed immediately, epoll_WAIT returns immediately. Otherwise, epoll_wait waits for at most the time indicated by TVP

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
	
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : - 1);
    
    if (retval > 0) {
        int j;

        numevents = retval;
        // Add the triggered event to the eventLoop->fired array
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
            if(e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; }}return numevents;
}
Copy the code

Epoll_wait parameters

  • Epfd: epollfd object
  • Events: A collection of events retrieved from the kernel
  • Maxevents: The maximum number of FDS to be returned this time
  • Timeout: indicates the timeout period (milliseconds). 0 is returned immediately and -1 indicates blocking.

Since the time event needs to wait for the file event to be processed, the event time may not be executed on time, as shown in the figure below:

Process new client connections

Receive client connection requests

When a client requests a file, it triggers the above binding event and calls the binding callback function acceptTcpHandler. In anetTcpServer, only 1000 connection requests are processed at a time to prevent the processing time from being too long. AnetGenericAccept is finally called by calling the chain acceptTcpHandler->anetTcpAccept->anetGenericAccept

static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
    int fd;
    while(1) {
        fd = accept(s,sa,len);
        if (fd == - 1) {
            If EINTR appears, accept system call is interrupted. Ignore EINTR and call again
            if (errno == EINTR)
                continue;
            else {
                anetSetError(err, "accept: %s", strerror(errno));
                returnANET_ERR; }}break;
    }
    // Returns the corresponding clientFD of the newly connected client
    return fd;
}
Copy the code

Get the client fd with the accept system call

The accept parameters:

  • s: socket fd
  • The sa: Accept call populates the address on the sa side of the request
  • Len: The size of the address will be filled in by len during the accept call

Returns the fd corresponding to the new client

Mount ClientFD to ePollfd

Receive client connections and perform initialization operations

static void acceptCommonHandler(connection *conn, int flags, char *ip) {
    Conn is a connection object generated based on a new client connection
	
    // Controls the simultaneous connection of client objects
    // Number of current client connections = number of connections to the current server + Number of connections used for cluster mode communication
    if (listLength(server.clients) + getClusterConnectionsCount()
        >= server.maxclients)
    {
        // Close the client connection
        // ...
    }
    
    // Initializes the client connection object, which will be used to store the client's commands, return results, and other parameters
    c = createClient(conn)
        
    // Perform the operation to receive data from the client
    connAccept(conn, clientAcceptHandler);
}
Copy the code

Initialize the client connection and mount the corresponding clientFD of the new client to ePollFD

client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));

    if (conn) {
        // Set connection parameters
        // Set clientfd to non-blocking
        connNonBlock(conn);
        // Disable Naegle's Algorithm
        connEnableTcpNoDelay(conn);
        if (server.tcpkeepalive)
            connKeepAlive(conn,server.tcpkeepalive);
        // Mount clientfd to ePollfd and set the callback function to readQueryFromClient
        connSetReadHandler(conn, readQueryFromClient);
		// ...
    }
    // ...
}
Copy the code

Handle client-readable events

ReadQueryFromClient is called via the callback function when a client-readable event is triggered

void readQueryFromClient(connection *conn) {
    // Read client data
    client *c = connGetPrivateData(conn);
    int nread, readlen;

    // Execute in multithreaded mode
    if (postponeClientRead(c)) return;
    
    // Single thread mode

    // Initialize the buffer parameters
    // ...
    
    // Read data from clientfd, write client instructions to buffer, qblen is queryBuf's existing data, readlen is the maximum length of each read
    nread = connRead(c->conn, c->querybuf+qblen, readlen);
    // Process buffer data
    processInputBuffer(c);
}
Copy the code

Inside connRead is a pointer to a function that actually calls the connSocketRead function, which reads the data using the system call read(int fd, void *buf, size_t nbyte). ReadQueryFromClient after reading the client command, it calls processInputBuffer to execute the corresponding function based on the client’s specified command. When the command is executed, the result is written back to the reply property of the client object.

Multithreading

PostponeClientRead checks to see if the conditions for multithreading are met, and if multithreading is available, it interrupts the postponeClientRead code in readQueryFromClient. Otherwise, proceed with the following code, which processes the client request in single-threaded mode.

int postponeClientRead(client *c) {
    // Add clients to the asynchronous queue when multithreaded I/O mode is enabled and the main thread is not processing blocking tasks.
    if (server.io_threads_active &&  // Check whether multithreaded I/O is enabled, and stop multithreaded I/O when there are fewer requests to be processed
        server.io_threads_do_reads &&   // Check whether multithreaded reading is configured! ProcessingEventsWhileBlocked &&// Check whether tasks such as loading data are blocked! (c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))// Master/slave library copy requests do not use multithreaded IO
        // The connection identifier is CLIENT_PENDING_READ to ensure that the connection is not queued repeatedly and that the next time it is queued, it will go directly to the command read and parse
    {
        // Add the CLIENT_PENDING_READ flag to the client, indicating that the client needs to be processed by multiple threads.
        // The subsequent I/O thread, after reading and parsing the client command, will judge this flag and abandon the command execution, leaving the main thread to execute it.
        c->flags |= CLIENT_PENDING_READ;
        
        // Put the client into a LIFO queue clients_pending_read
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
        return 0; }}Copy the code

After the client to join clients_pending_read queue will be remove by handleClientsWithPendingReadsUsingThreads and processing

int handleClientsWithPendingReadsUsingThreads(void) {
    if(! server.io_threads_active || ! server.io_threads_do_reads)return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);

    // Create a list iterator and add the clients waiting to be processed to the iterator header
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    
    // Assign the clients waiting to be processed to the worker threads in turn
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        // io_threads_list is a queue list of clients to be processed per thread
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // Sets the global variable from which the worker thread reads and writes
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    // The main thread handles clients in io_threads_list[0]
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    // Clear the io_threads_list queue after completing the task
    listEmpty(io_threads_list[0]);

    // The main thread waits for all other worker threads to finish processing
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O READ All threads finshed\n");

	// ...
}
Copy the code

IO threads are initialized with initThreadedIO by the main thread. The number of threads is controlled by io_threads_num in the startup parameter. Each worker thread reads the client instance to be processed from its own task queue io_threads_list[n] and then calls readQueryFromClient to perform a single-threaded command processing operation

Write the result back to the client

After the readQueryFromClient call processCommand has processed the command, addReply is called to write the result to the client buffer

void addReply(client *c, robj *obj) {
    // Checks if there is any unsent data in the client buF buffer, adds the new data to the end of the unsent data, and registers a write event callback function
    if(prepareClientToWrite(c) ! = C_OK)return;
    
    // ...
    _addReplyToBuffer(c,buf,len)
    // ...
}
Copy the code

_addReplyToBuffer writes the result to the buF property of the client object

int _addReplyToBuffer(client *c, const char *s, size_t len) {
    size_t available = sizeof(c->buf)-c->bufpos;
    
    // ...

    // Perform a memory copy
    memcpy(c->buf+c->bufpos,s,len);
    c->bufpos+=len;
    return C_OK;
}

Copy the code

Result to the client object buf later, will be performed by the main loop invocation chain aeMain – > beforeSleep – > handleClientsWithPendingWritesUsingThreads will be sent the data in the buffer

int handleClientsWithPendingWritesUsingThreads(void) {

    // If multithreaded mode is not enabled, jump to single thread processing
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }
        
    // Write the client object that has data to send to an iterator
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    
    int item_id = 0;
    // Assign the clients to the task queue of each thread in turn
    while((ln = listNext(&li))) {
		// ...
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // Set the global variable that other worker threads use to perform read and write tasks
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    // The main thread handles clients in the io_threads_list[0] queue
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);

    // The client waits for the worker thread to complete the task
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }

    // Register the sendReplyToClient callback function for each client that has finished processing
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        connSetWriteHandler(c->conn, sendReplyToClient)
    }
	// ...
}
Copy the code

WriteToClient calls the WRITE system call via writeToClient->connWrite-> Write to send the result

reference

  • Redis design and implementation
  • Event loops in Redis
  • C++ server development essence
  • Redis multi-threaded network model
  • Redis Protocol specification