“This article has participated in the good article call order activity, click to see: back end, big front end double track submission, 20,000 yuan prize pool for you to challenge!”

preface

Last time we talked about the startup process of the Redis service. The startup process can be divided into: Parameter and configuration initialization -> start service binding listener -> start multithreading -> event polling, through the service start process we are also familiar with the overall architecture and code style of Redis, for the subsequent source reading will be more easy to understand, One of the important processes left over from last time is the event mechanism of Redis. One of the reasons for Redis’ high performance is the IO multiplexing mechanism. In version 6.0, IO multiplexing based on multithreading has also been added. So how does Redis use these two swords to combine Redis performance? Let’s explore and learn 🙂

Redis service startup process review: juejin.cn/post/698387…

Event structure definition

Redis event source is located in:./ SRC/AE.* related files, which ae. H file defines the event related structure and event API, Redis event is divided into file events and time events, file events mainly deal with network read and write requests, time event processing timer task, delay task, etc..

  • File event structure definition
/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;
Copy the code

mask: AE_BARRIER means a barrier event. The AE_BARRIER event affects the read and write order of the event. For example, a beforesleep callback performs the fsync action, and then needs to get the result back to the client quickly. In this case, the AE_BARRIER event is used to reverse the order of events. RfileProc: Read event handlers, such as the: aeCreateFileEvent() method used to create file events in server.c. WfileProc: Write event handlers, such as the redisAeWriteEvent() method. ClientData: special value for different multiplexing modes.

  • Time event structure definition
/* Time event structure */
typedef struct aeTimeEvent {
    /* Globally unique event ID */
    long long id;
    /* The event reaches the timestamp in seconds */
    long when_sec; 
    /* The event arrives at a timestamp in milliseconds */
    long when_ms;
    /* Time event handler callback */
    aeTimeProc *timeProc;
    /* After the event is over, the destructor releases resources */
    aeEventFinalizerProc *finalizerProc;
    /* Event private data */
    void *clientData;
    /* Two-way pointer */
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
    /* Reference counting to prevent timer events from being * released * in recursive time event calls */
    int refcount;
} aeTimeEvent;
Copy the code

The time events of Redis are processed by two-way linked list, which is relatively simple. The general data structure such as time wheel or minimum heap is stored in the ordered structure.

  • Event polling interface definition
/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* Maximum file descriptor ID */ currently registered
    int setsize; /* Maximum number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect the system clock deviation */
    aeFileEvent *events; /* Registered file events */
    aeFiredEvent *fired; /* Active events */
    aeTimeEvent *timeEventHead; /* Registered time events */
    int stop; /* Whether to notify the rotation */
    void *apidata; /* Different polling API special value store */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
    int flags;
} aeEventLoop;
Copy the code
  • Polling API definitions
./* Create time polling, create */ in the main thread that starts
aeEventLoop *aeCreateEventLoop(int setsize);

/* Create file event */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData);
/* Start event polling */        
int aeProcessEvents(aeEventLoop *eventLoop, int flags); .Copy the code

The API that the event mechanism needs to meet is defined in the Ae. h file, and four multiplexing methods are implemented in Redis, which are epoll, evport, kqueue and SELECT respectively. In the Ae. c file, different files are loaded according to different compilation options

#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif
Copy the code

The different loading order is also reflected in the performance of different multiplexing apis. Of course, the performance is not absolute. This order is reasonable in most network traffic environments in the current Internet era.

The above structure analyzes the related infrastructure definition and event API definition of Event processing in Redis, based on which the multiplex file event mechanism of Redis is realized.

The event process

Epoll system call

Epoll uses three apis to implement the event multi-listener processing mechanism, which are:

#include <sys/epoll.h>
// Create an epoll instance
int epoll_create(int size);
// Register event listener
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// Wait for the epoll event
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

// Epoll_event has the following structure:
struct epoll_event {
    uint32_t     events;      /* Epoll events */
    epoll_data_t data;        /* User data variable */
};

typedef union epoll_data {
    void        *ptr;
    int          fd;
    uint32_t     u32;
    uint64_t     u64;
} epoll_data_t;
Copy the code
  1. An epoll instance can be created by first calling epoll_create(). This size parameter is deprecated after a Linux kernel version is greater than 2.6.8, but the value passed in must be greater than 0 to be compatible with previous versions. An epoll FD is returned for subsequent operations.

  2. Call epoll_ctl() to set the event listener for the target fd with the following parameters:

    • Epfd: The first argument is the fd returned by epoll_create()

    • Op: specifies the event type to be set. Available values are EPOLL_CTL_ADD(adding fd to an event), EPOLL_CTL_MOD(changing fd to an event), and EPOLL_CTL_DEL(deleting fd to an event).

    • Event: The specific registered events, that is, which events the current file descriptor needs to care about. The Events field is a mask for a set of event operations, which can be defined by the event macro as follows:

      Macro definition describe
      EPOLLIN Indicates that the corresponding file descriptor can be read (including normal closing of the peer SOCKET).
      EPOLLOUT Indicates that the corresponding file descriptor can be written
      EPOLLPRI Indicates that the corresponding file descriptor has urgent data to read (this should mean out-of-band data arriving)
      EPOLLERR Indicates an error occurred in the corresponding file descriptor
      EPOLLHUP Indicates that the corresponding file descriptor is hung up
      EPOLLET Set EPOLL to Edge Triggered mode (the default is horizontal). This is in contrast to Level Triggered, where Libevent is horizontal and NGINx is Edge Triggered
      EPOLLONESHOT If you want to continue listening on the socket, you need to add the socket to the EPOLL queue again
  3. Call epoll_wait() and return the number of fd’s that can be processed and an array of *events Pointers. You can iterate through this array to get the events. The parameters are:

    • Epfd: the fd returned by epoll_create()
    • * Events: an array of fd events that can be handled, written back to the caller as Pointers
    • Maxevents: Indicates the maximum number of events that can be processed
    • Timeout: Specifies the timeout value (ms) for waiting for an I/O event to occur. -1 never times out until an event is generated, and 0 returns immediately

Create an event

Returning to the Redis service start process, call the initServer(void) method in the main process to create the related event objects:

  1. Create a time loop object and assign it to the EL field of the server object.
  2. createserverCronTime event, a very important time event in Redis, this event callback is executed per secondserver.hzSecond, perform such things as: active overdue key collection (it looks in the * in a lazy way), updating some statistics, incremental re-hashing of DBS hash table, triggering BGSAVE/AOF rewrite, processing destroyed children, etc.
  3. Create socket file event to accept new TCP and UNIX connections.
void initServer(void) {...// Create event polling objectsserver.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); .// Create serverCron time event
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL.NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1); }...// Create file event
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event."); }}... }Copy the code

This is done by calling aeApiCreate() in the aeCreateEventLoop() function. This is done by calling different system calls on different platforms:

// aeApiAddEvent is implemented in ae_epoll.c
static int aeApiCreate(aeEventLoop *eventLoop) {... state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */. }// Implementation in ae_kqueue.c
static int aeApiCreate(aeEventLoop *eventLoop) {... state->kqfd = kqueue(); . }Copy the code

This completes the creation of the multiplexed event object.

Add event

The aeCreateFileEvent method is responsible for creating a file event. In this method, the aeApiAddEvent API is called to register the fd and mask event masks to the system’s IO multiplexing event listener. Call epoll_ctl() on Linux and kevent() on Mac

// Create file event
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData) {...// Register the current event FD and the type of event to listen for
    aeApiAddEvent(eventLoop, fd, mask)
    // Set the callback function for file events
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if(mask & AE_WRITABLE) fe->wfileProc = proc; . }// aeApiAddEvent is implemented in ae_epoll.c
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {...if (epoll_ctl(state->epfd,op,fd,&ee) == - 1) return - 1; . }// Implementation in ae_kqueue.c
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {...if (kevent(state->kqfd, &ke, 1.NULL.0.NULL) = =- 1) return - 1; . }Copy the code

The callback function to add file events to the main process is acceptTcpHandler, which creates a new socket connection.

As you can see, Redis doesn’t do much extra work on the system call aspect of IO multiplexing, just a shallow encapsulation of a unified API, At the same time, we define a uniform set of event masks, and then map the corresponding event to each function for different operating systems. At this point, we have completed the operation of creating and listening to the event, and the rest is to wait for an event to arrive for the used file descriptor, 🙂

The polling event

Void aeMain(aeEventLoop *eventLoop) is called in the main process to create the multiplex object, add the event, and then call void aeMain(aeEventLoop *eventLoop) in the main process to wheel the event:

// Endless cycle rotation
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

// aeProcessEvents method implementation
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {...// Call the multiplexing API and return the event that was fired
    numevents = aeApiPoll(eventLoop, tvp);
    
    // Iterate over the list of events and call the read and write callback function according to the time type
    for (j = 0; j < numevents; j++) {
        aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
        int mask = eventLoop->fired[j].mask;
        intfd = eventLoop->fired[j].fd; ./* Writable events. */
        if (fe->mask & mask & AE_WRITABLE) {
            if (!fired || fe->wfileProc != fe->rfileProc) {
                fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }
        }
        ...
        /* Readable events. */
        if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
        {
            fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            fired++;
        }
    }
    ...
}

// aeApiPoll in ae_epoll.c
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {... retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : - 1); . }// aeApiPoll in ae_kqueue.c
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {... retval = kevent(state->kqfd,NULL.0, state->events, eventLoop->setsize, &timeout); . }Copy the code

The multiplexing API aeApiPoll is used to obtain the corresponding event list, and then the fd read and write events are traversed. The callback function is called to handle the read and write operations on the socket.

Close the event

Most articles analyze the mechanism of events and only pay attention to the occurrence and processing logic of events. However, if you want to be an excellent engineer, see the details in the chapter. Finally, the event polling is stopped by calling aeStop to stop the event polling service, and then calling aeDeleteEventLoop to turn off the multiplexed FD and release the associated resources

// Delete the time poll
void aeDeleteEventLoop(aeEventLoop *eventLoop) {
    aeApiFree(eventLoop);
    zfree(eventLoop->events);
    zfree(eventLoop->fired);

    /* Release time event list */
    aeTimeEvent *next_te, *te = eventLoop->timeEventHead;
    while (te) {
        next_te = te->next;
        zfree(te);
        te = next_te;
    }
    zfree(eventLoop);
}

// aeApiFree
static void aeApiFree(aeEventLoop *eventLoop) {
    aeApiState *state = eventLoop->apidata;
    close(state->epfd); / / close the socket
    zfree(state->events); // Release event memory
    zfree(state);
}
Copy the code

conclusion

  1. First of all ininitServerIn the callaeCreateEventLoopCreate event polling objects.
  2. Register event callbacks, such as in the main process:
    • Time event callback:aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL)
    • File event callback:aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL)
  3. To start event polling and process actionable events through an infinite loop, call:aeMainThe implementation.
  4. Close event polling, release related resources, invokeaeDeleteEventLoopThe implementation.

So far, Redis has completed the analysis of the multiplexing mechanism, I don’t know if there is a big question in your mind, IO multiplexing and multithreading correlation to achieve double performance? This is also the author began to read Redis 6.0 source curious point, with this question, the next article we explore together, feel helpful to their own small partners, don’t forget the quality of three, at the same time to give the author a small red heart encouragement 🙂

Epoll official demo is attached

#define MAX_EVENTS 10
struct epoll_event  ev.events[MAX_EVENTS];
int         listen_sock, conn_sock, nfds, epollfd;


/* Code to set up listening socket, 'listen_sock', * (socket(), bind(), listen()) omitted */

epollfd = epoll_create1( 0 );
if ( epollfd == - 1 )
{
    perror( "epoll_create1" );
    exit( EXIT_FAILURE );
}

ev.events   = EPOLLIN;
ev.data.fd  = listen_sock;
if ( epoll_ctl( epollfd, EPOLL_CTL_ADD, listen_sock, &ev ) == - 1 )
{
    perror( "epoll_ctl: listen_sock" );
    exit( EXIT_FAILURE );
}

for(;;) { nfds = epoll_wait( epollfd, events, MAX_EVENTS,- 1 );
    if ( nfds == - 1 )
    {
        perror( "epoll_wait" );
        exit( EXIT_FAILURE );
    }

    for ( n = 0; n < nfds; ++n )
    {
        if ( events[n].data.fd == listen_sock )
        {
            conn_sock = accept( listen_sock,
                        (struct sockaddr *) &local, &addrlen );
            if ( conn_sock == - 1 )
            {
                perror( "accept" );
                exit( EXIT_FAILURE );
            }
            setnonblocking( conn_sock );
            ev.events   = EPOLLIN | EPOLLET;
            ev.data.fd  = conn_sock;
            if ( epoll_ctl( epollfd, EPOLL_CTL_ADD, conn_sock,
                    &ev ) == - 1 )
            {
                perror( "epoll_ctl: conn_sock" );
                exit( EXIT_FAILURE ); }}else{ do_use_fd( events[n].data.fd ); }}}Copy the code