This article has participated in the activity of “New person creation Ceremony”, and started the road of digging gold creation together.

preface

This article records personal experience in learning epoll network programming

  1. Learn how to use epoll
  2. Understand the REACTOR model
  3. Understand various network models

It would be my pleasure to help readers in both areas

Front knowledge

We’ve implemented the simplest TCP server before: juejin.cn/post/704925…

But obviously, we wrote a TCP server that was too simple. It can only connect to one connection, and once it is connected to one connection, it is completely blocked in the connection and cannot add new connections. Such a broken version of the server is something we can’t stand. For this reason, we introduced IO multiplexing — that is, introducing a “secretary” to help listen for clients, so that applications do not have to block listening, which greatly increases CPU utilization.

The three secretaries provided by Linux are select, poll, and epoll

Epoll correlation function

epoll_create

Man 2 epoll_create Creates an epoll file descriptor

Dependent header file
#include <sys/epoll.h>
Copy the code
The function prototype

int epoll_create(int size);

Parameters that

● int size size: the number of listening nodes in the tree.

The return value

● Return fd of the root node of a newly created red-black tree of a carton on success ● Return -1 on failure and set errno

epoll_ctl

man 2 epoll_ctl

Dependent header file
#include <sys/epoll.h>
Copy the code
The function prototype

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

Parameters that

● int epfd epoll_create returns the value of epfd epoll_create

  1. EPOLL_CTL_ADD

3. EPOLL_CTL_DEL removes a FD from the tree. ● int fd description of the file to be monitored ● struct Epoll_event *event is a struct epoll_event

struct epoll_event {

uint32_t events; /* Epoll events */ epoll_data_t data; /* User data */Copy the code

};

In this structure variables: uint32_t events: EPOLLIN EPOLLOUT/EPOLLERR epoll_data_t data: Union epoll_data** this means that if you use PTR, you cannot use data

typedef union epoll_data {

           void        *ptr;
           int          fd;
           uint32_t     u32;
           uint64_t     u64;
       } epoll_data_t;
Copy the code
  1. Void * PTR this pointer can be used to call the epoll reactor model.
  2. Int fd Fd of the listening event
  3. uint32_t u32; Generally don’t have to
  4. uint64_t u64; Generally don’t have to
The return value

● Return 0 on success ● Return -1 on failure and set errno

Epoll_wait function

man 2 epoll_wait

Dependent header file
#include <sys/epoll.h>
Copy the code
The function prototype

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

Parameters that
  • int epfd

The return value of epoll_create

  • struct epoll_event *events

Events are essentially an array.

Outgoing parameter. Pass out those FD structures that meet the listening conditions.

  • int maxevents

【events】 Total number of elements in the array

e.g

struct epoll_event events[1024];

At this point maxEvents should be set to 1024

  • int timeout

Timeout Indicates the timeout period:

-1: blocks [waits until data is available]

0: return immediately, non-blocking [return immediately if null is detected]

0: Specifies the timeout period (ms)

The return value
  • successful

  • Return >0: the total number of listeners satisfied, which can be used as the upper limit of the loop

  • Returns 0: no listening event that fd satisfies

  • Returns -1 on failure and sets errno

General flow of epoll model

int lfd = socket();
bind();
listen();
// Create a listening red-black tree with OPEN_MAX as the maximum number of connections held
int epfd = epoll_create(OPEN_MAX);
// Add LFD to the listening red-black tree
// Set the listening attributes of the TEP
epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &tep);
while(1) {// Whether the listener is requested
    epoll_wait(epfd, ep, OPEN_MAX, - 1);
    // Traversal handles listening events
    for() {
        // Handle LFD foreground events
        if(ep[i].data.fd == lfd)
            // Add the new CFD to the listening red-black tree
            epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &tep);
        else{Read/Write handle requests from CFD}}}Copy the code

LT and ET mode

LT model

Horizontal trigger [default mode], as long as there is data will trigger. It can be understood that all the high levels above trigger. That is, events are triggered whenever the level is high.

LT(level triggered) : LT is the default working mode and supports both block and no-block sockets. In this way, the kernel tells you if a file descriptor is ready, and then you can IO the ready FD. If you do nothing, the kernel will continue to inform you, so this mode is less likely to cause errors. The traditional SELECT /poll is representative of this model.

event.events = EPOLLIN;

ET model

Edge trigger, fired only when data arrives, regardless of whether there is still data in the cache. It takes a level change from 0 to 1 above to trigger it. That is, only the rising edge is captured

ET(edge-triggered) : THIS is high-speed and supports only no-block sockets. In this mode, the kernel tells you via epoll when a descriptor is never ready to go to ready. It then assumes that you know the file descriptor is ready and will not send any more ready notifications for that file descriptor. Note that the kernel does not send more notifications (only once) if it does not IO the FD all the time (thus causing it to become unready again).

event.events = EPOLLIN | EPOLLET;

To compare

A readn call blocks, for example, 500 characters, but reads only 498, blocks, and waits for two more characters. However, in server code, once read becomes readn blocked, it is not woken up, because epoll_wait does not loop because readn blocks. New data cannot be read. It’s a bit of a deadlock, two characters off so it blocks, because it blocks, it can’t read new characters.

Reactor

Reactor model is a typical event-driven programming model, Reactor inverts the process of program, and its basic idea is Hollywood Principle — ‘Don’t call us, we’ll call you’.

Composition: IO multiplexing + non-blocking IO

Convert the processing of IO to the processing of events

Single reactor

Here’s what Redis does:

Redis source code:

Package level:

  • Ae. c and Ae. h encapsulate the contents related to epoll. Ae_kqueue. c is for MAC

  • The anetTcpServer function in anet.h is used to bind IP ports. The anetNonBlock function is used to set fd non-blocking

  • Networking. C encapsulates the Redis protocol and IO multithreading

Optimization idea

When the main thread receives the data, it throws the business part to the thread pool.

Multiple reactor

multithreading

Multiple Epoll objects -> multiple REACTORS

Reactor is equal to the number of CPU cores

This is what memcached does [a separate reactor accepts new connections]

Multiple processes

Nginx does just that

Nginx uses edge triggering

Redis and memcached are both triggered horizontally

Use the Reactor pattern to encapsulate the Epoll-based TCP server

Core idea: bind fd and corresponding event. In this way, the corresponding callback function is directly called during the callback, without having to check whether fd is equal to socketfd again. Change: Write specific code for partition events above, change to execute corresponding callback after partition events:

for() {if(EPOLLIN)	eb->callback();
  if(EPOLLOUT)	eb->callback();
}
Copy the code

EPOLL events are configured for send and RECV. The EPOLL attribute circulates between the recv_cb and send_cb callbacks

Matters needing attention

Struct epoll_event data, union! You can’t write PTR if you write data!

struct epoll_event
{
  uint32_t events;	/* Epoll events */
  epoll_data_t data;	/* User data variable */
} __EPOLL_PACKED;
// PTR cannot write data.
typedef union epoll_data
{
  void *ptr;
  int fd;
  uint32_t u32;
  uint64_t u64;
} epoll_data_t;

// e.g
struct epoll_event ev;
ev.ptr = si;
// ev.data = fd; You can only choose one or the other! Otherwise, there will be a problem with the value
Copy the code

EPOLLLT mode should be used for listening on LFD [new client connection] and EPOLLET mode should be used for listening on old client connection [ET]. After this version of the code exercise, If we treat listenFD and ClientFD separately, we can greatly improve the concurrency of server intervention. [For example, if there are multiple clientFd and only one listenFD, the access speed of the listenFD will be reduced.]

demo

#include <arpa/inet.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>

#define BUFFER_LENGTH 1024

struct sockitem {
    int sockfd;
    // The callback function
    int (*callback)(int fd, int events, void *arg);
    // Temporarily buffered data in case of sticky packets
    char recvbuffer[BUFFER_LENGTH];
    char sendbuffer[BUFFER_LENGTH];
    // The length of the received data
    int rlength;
    // The length of the data sent
    int slength;
};

// mainloop
// Save global public variables
struct reactor {
    int epfd;

    struct epoll_event events[1024].
};

struct reactor *eventloop = NULL;

int recv_cb(int fd, int events, void *arg);

int send_cb(int fd, int events, void *arg) {
    struct sockitem *si = (struct sockitem *)arg;

    send(fd, si->sendbuffer, si->slength, 0);

    struct epoll_event ev;
    // After sending, change the listening event to EPOLLIN
    ev.events = EPOLLIN | EPOLLET;
    si->sockfd = fd;
    si->callback = recv_cb;
    ev.data.ptr = si;

    epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
}

int recv_cb(int fd, int events, void *arg) {

    //int clientfd = events[i].data.fd;
    struct sockitem *si = (struct sockitem *)arg;
    struct epoll_event ev;

    int ret = recv(fd, si->recvbuffer, BUFFER_LENGTH, 0);
    if (ret < 0) {
        // Not very well, need to learn more
        if (errno == EAGAIN || errno == EWOULDBLOCK) { //
            return - 1;
        } else {
        }

        ev.events = EPOLLIN;
        epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);

        close(fd);

        free(si);

    } else if (ret == 0) {
        printf("disconnect %d\n", fd);
        ev.events = EPOLLIN;
        //ev.data.fd = fd;
        epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);

        close(fd);

        free(si);

    } else {
        // Business is handled here
        // http
        // websocket

        printf("Recv: %s, %d Bytes\n", si->recvbuffer, ret);
        // Return the same data as received
        si->rlength = ret;
        memcpy(si->sendbuffer, si->recvbuffer, si->rlength);
        si->slength = si->rlength;
        // If the buffer can be sent in one time, the buffer will not be sent at all.
        // Use EPOLLOUT events to solve this problem elegantly!
        // send(fd, buffer, ret, 0);

        struct epoll_event ev;
        // Change the listening event from EPOLLIN to EPOLLOUT event to trigger send_cb
        ev.events = EPOLLOUT | EPOLLET;
        si->sockfd = fd;
        si->callback = send_cb;
        ev.data.ptr = si;
        // Change to listen for EPOLLOUT events in the monitor treeepoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev); }}int accept_cb(int fd, int events, void *arg) {
    struct sockaddr_in client_addr;
    bzero(&client_addr, sizeof(client_addr));
    socklen_t client_addr_len = sizeof(client_addr);
    int cfd = accept(fd, (struct sockaddr *)&client_addr, &client_addr_len);
    if (cfd == - 1) {
        perror("accept error!");
        exit(1);
    }
    char str[INET_ADDRSTRLEN] = {0};
    printf("recv from %s at port %d\n", inet_ntop(AF_INET, &client_addr, str, sizeof(str)),
           ntohs(client_addr.sin_port));

    struct epoll_event ev;
    ev.events = EPOLLIN | EPOLLET;

    // Set the new callback function
    struct sockitem *si = (struct sockitem *)malloc(sizeof(struct sockitem));
    si->sockfd = cfd;
    si->callback = recv_cb;
    // Set the recv_CB callback to the client in the listening tree.
    ev.data.ptr = si;
    // Add a client to the listener tree!
    epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, cfd, &ev);
    return cfd;
}

int main(int argc, char *argv[]) {
    if (argc < 2) {
        return - 1;
    }
    int port = atoi(argv[1]);
    int sfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sfd == - 1) {
        perror("socket error!");
        exit(1);
    }
    struct sockaddr_in addr;
    bzero(&addr, sizeof(addr));

    addr.sin_addr.s_addr = INADDR_ANY;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);

    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == - 1) {
        perror("bind error!");
        exit(1);
    }

    / / the server
    if (listen(sfd, 5) < 0) {
        perror("listen error!");
        exit(1);
    }

    eventloop = (struct reactor *)malloc(sizeof(struct reactor));

    struct epoll_event ev;

    // epoll opera
    // Epoll_create = 1 -> create a root node
    eventloop->epfd = epoll_create(1);

    // Add SFD to the epoll red-black tree
    // Set events to LT mode for creating connections!
    ev.events = EPOLLIN;
    // Because ev.data. PTR is used below, fd will be invalid!
    // Ev.data is a union!
    // ev.data.fd = sfd;

    struct sockitem *si = (struct sockitem *)malloc(sizeof(struct sockitem));
    si->sockfd = sfd;
    si->callback = accept_cb;
    // Put the address of the callback into the listener tree
    // This will trigger the event to execute the callback function
    ev.data.ptr = si;

    epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, sfd, &ev);

    //
    while (1) {
        // 1024 is the maximum number of events that an epoll_wait can fetch
        // The size of the Courier's bag
        int nread = epoll_wait(eventloop->epfd, eventloop->events, 1024.- 1);
        if (nread == - 1) {
            break;
        }

        for (int i = 0; i < nread; i++) {
            // Categorize events directly. Connections and disconnections are really EPOLLIN

            if (eventloop->events[i].events & EPOLLIN) {
                printf("EPOLLIN\n");
                // This is a long string of code that can be disposed of directly using the callback function
                // There is no need to distinguish between LFD [new client] and CFD [old client sends data]. Let callbacks schedule different callback functions for processing
                struct sockitem *si = (struct sockitem *)eventloop->events[i].data.ptr;
                si->callback(si->sockfd, eventloop->events[i].events, si);
            }

            /* However, the EPOLLOUT trigger condition is: 1. The buffer is full, return EAGAIN(11) 2. The peer side reads some data and is writable again
            if (eventloop->events[i].events & EPOLLOUT) {
                printf("EPOLLOUT\n");
                struct sockitem *si = (struct sockitem *)eventloop->events[i].data.ptr;
                si->callback(si->sockfd, eventloop->events[i].events, si);
                // int clientfd = events[i].data.fd;
                // char buf[1024] = "We have send!" ;
                // send(clientfd, buf, sizeof(buf), MSG_DONTWAIT);}}}}Copy the code

conclusion

The introduction of Epoll eliminates the need to wait for IO data, lets epoll, the “secretary,” answer the phone, and the Reactor optimizes code logic for future expansion. This is a very practical engineering experience, many open source projects, as long as the network related content, are inseparable from the combination of Reator+Epoll.