I’m participating in nuggets Creators Camp # 4, click here to learn more and learn together!

Mainstream I/O models

Blocking I/O, non-blocking I/O, and asynchronous I/O.

BIO model

Synchronous blocking model in which a single client corresponds to a single linked processing thread

Disadvantages:

1. If read is a block operation in IO, if the requested link operation does not do any operation, it will cause the thread to block, wasting thread resources

2. If there are many threads, it will cause server stress, such as C10K problem

Reference scenario:

The BIO approach uses a small and fixed architecture, which requires a lot of server resources, but is simple and easy to understand.

NIO model

Synchronous non-blocking is the server’s implementation of a pattern in which a thread can process multiple requests (links), and any links sent by the client are registered with the multiplexer selector, which takes turns to process any incoming IO requests.

Application Scenarios:

NIO is suitable for the architecture with a large number of links (light operation), such as chat server, bullet screen system, communication between servers, which is complicated to program. The Java NIO model is shown below:

Conclusion:

The three core components of NIO: channels, buffers, and selectors

1. A Channel is similar to a stream. Each Channel has a buffer buffer.

2. The Channel volume is added to the Selector, which is handed by Selecotor to the idle thread according to the Channel read or write event.

3. Buffers and channels in NIO are both readable and writable.

NIO model implementation

In Linux, the socket is created by calling system kernel functions. Selecotor corresponds to the epoll descriptor of the operating system. The socket connection file descriptor can be bound to the epoll file descriptor for asynchronous notification of events, to achieve a thread processing, and reduce a large number of invalid traversal, event processing is handed over to the operating system kernel, improving efficiency.

Redis threading model

Redis is a typical Epoll-based NIO threading model in which all events (connection and read events) of the ePoll instance phone are handled by a single service thread.

The source code for Redis underlying epoll is in the SRC /ae_epoll.c file.

AIO model

Asynchronous non-blocking, due to the operating system after the completion of the callback notifies the program to start the thread to process, generally suitable for a large number of links and links for a long time applications.

Application Scenarios:

The AIO mode is applicable to a large number of links and long links (reoperations). For example, the device reports status every two seconds.

Comparison of three I/O models

BIO NIO AIO
IO model A synchronized block Synchronous non-blocking (multiplexing) Asynchronous nonblocking
Programming difficulty simple complex complex
reliability poor good good
throughput high high high

Redis threading model

1. Interaction model

2. Reactor model

Processing process:

  • The main thread registers read events on the socket to the epoll kernel event table.
  • The main thread calls epoll_wait to wait for data to become readable on the socket.
  • Epoll_wait notifies the main thread when the socket is readable, and the main thread queues socket-readable events.
  • Sleep is woken up by a worker thread on the request queue, which reads data from the socket, processes the user request, and registers the socket write-ready event in the epoll kernel event table.
  • The main thread epoll_wart waits for the socket to be writable
  • Epoll_wait notifies the main thread when the socket is writable. The main thread queues socket-writable events into the request.
  • Sleep is awakened by a thread in the request queue that writes to the socket the result of the server processing the client’s request.

Advantages and disadvantages:

  • advantages

    • Fast response, no blocking for a single synchronous operation, and no fd cross-threading concerns.
    • Scalability. CPU resources can be easily utilized by the number of reactor instances (such as Multi reactor).
    • Reusability. Reacotor itself is independent of specific event processing logic, making it easy to reuse.
  • disadvantages

    • If a long read/write time occurs in a shared reactor, the response time of the reactor will be affected. In this case, threadper-connection can be considered

3. Reactor model example

Server (based on Netty) :

// Based on Java code
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
  ServerBootstrap b = new ServerBootstrap();
  b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 4096)
    .childHandler(new JkvServerInitalizer());

  ChannelFuture f = b.bind(SERVER_PORT).sync();
  f.channel().closeFuture().sync();
} finally {
  bossGroup.shutdownGracefully();
  workerGroup.shutdownGracefully();
}
Copy the code

Client (based on Netty) :

EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {

  Bootstrap bootstrap = new Bootstrap();
  bootstrap.group(eventLoopGroup)
    .channel(NioSocketChannel.class)
    .handler(new MyChatClientInitializer());

  Channel channel = bootstrap.connect("localhost",SERVER_PORT).sync().channel();

  BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
  for (;;) {
    channel.writeAndFlush(br.readLine() + "\r\n"); }}finally {
  eventLoopGroup.shutdownGracefully();
}
Copy the code

Redis network model

Redis uses a single-thread Reactor. Single pressure QPS can reach 10W, because Redis is mainly based on memory read and write, the efficiency is very high.

Redis server is an event-driven program, and the server needs to handle two types of events:

1. File events: Redis server connects to clients (or other Redis servers) through sockets, and file events are the extraction of socket operations by the server. Communication between a server and a client (or other server) generates file events, and the server performs a series of network communication operations by listening for and processing these events

2. Time Events: Redis server China has operations (such as the serverCron function) that need to be executed at a given event point, and time events are abstractions of the server’s timed operations.

File events

Redis developed its own network event handler based on the Reactor model: this handler is called the File Event Handler.

  • The file event handler uses the I/O multiplexing program to listen for multiple sockets at the same time and associate the socket with different event handlers based on the task it is currently performing.
  • When the socket being listened to is ready to perform accept, read, write, close, etc., file events corresponding to the operation are generated, and the file event handler invokes the event handler associated with the socket to handle these events.

The file event consists of four parts of the file event handler: socket, I/O multiplexer, file event dispatcher, and event handler.

All of the multiplexer functionality is implemented by wrapping the common I/O multiplexer libraries such as SELECT, epoll, EVport, and Kququee. Each I/O multiplexer has a separate file in Redis such as: SRC \ae_epoll.c, SRC \ac_evport.c, SRC \ AC_kqueue. c, SRC \ac_select.c, etc.

Because each Redis I/O multiplexing library implements the same API, the underlying implementation of an I/O multiplexing program is interchangeable.

Redis I/O multiplexing program implementation source code through the #include macro definition of the corresponding valley, the program will automatically select the system during compilation of the highest performance I/O multiplexing function library as the underlying Redis I/O multiplexing program implementation:

/* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif
Copy the code

Types of events

I/O multiplexers can listen for ae. H /AE_READABLE and AE. H /AE_WRITABLE events for multiple sockets. The mapping between these two types of events and socket operations is as follows:

  • The socket generates an AE_READABLE event when it becomes readable (the client performs a write operation on the socket, or a close operation) or when a new acceptable socket appears (the client performs a connect operation on the server’s listening socket).
  • When a socket becomes writable (the client performs the read operation on the socket), the socket generates an AE_WRITABLE event.

If the socket is readable and writable at the same time, the server reads the socket first and writes the socket later.

File event handler

1. Connect to the reply processor

Networking. C/acceptTcpHandler function is Redis connection response processor, the CPU is used to connect to the server listening socket to reply the client, specific implementation for the sys/socket. H/the accept function of packaging.

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if(errno ! = EWOULDBLOCK) serverLog(LL_WARNING,"Accepting client connection: %s", server.neterr);
            return;
        }
        anetCloexec(cfd);
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); }}Copy the code

2. Command request handler

Networking. C/readQueryFromClient function is Redis command request processor, the CPU is responsible for the read from the socket client sends the command request content, concrete implementation for unistd. H/read function of packaging.

void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;

    /* Check if we want to read from the client later when exiting from * the event loop. This is the case if threaded I/O is enabled. */
    if (postponeClientRead(c)) return;

    /* Update total number of reads on server */
    atomicIncr(server.stat_total_reads_processed, 1);

    readlen = PROTO_IOBUF_LEN;
    /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * Redis Object representing the argument. */
    if(c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen ! =- 1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);

        /* Note that the 'remaining' variable may be zero in some edge case, * for example once we resume a blocked client after CLIENT PAUSE. */
        if (remaining > 0 && remaining < readlen) readlen = remaining;
    }

    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = connRead(c->conn, c->querybuf+qblen, readlen);
    if (nread == - 1) {
        if (connGetState(conn) == CONN_STATE_CONNECTED) {
            return;
        } else {
            serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
            freeClientAsync(c);
            return; }}else if (nread == 0) {
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClientAsync(c);
        return;
    } else if (c->flags & CLIENT_MASTER) {
        /* Append the query buffer to the pending (not applied) buffer * of the master. We'll use this buffer later in order to have a * copy of the string applied by the last command executed. */
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }

    sdsIncrLen(c->querybuf,nread);
    c->lastinteraction = server.unixtime;
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    atomicIncr(server.stat_net_input_bytes, nread);
    if(sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();  bytes = sdscatrepr(bytes,c->querybuf,64);
        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClientAsync(c);
        return;
    }

    /* There is more data in the client input buffer, continue parsing it * in case to check if there is a full command to execute. */
     processInputBuffer(c);
}
Copy the code

3. Command reply processor

Networking. C/reply sendReplyToClient function is Redis command processor, the CPU is responsible for the server to perform the command response resulting from the command through a socket returned to the client, the specific implementation for unistd. H/write function of packaging.

/* Write event handler. Just send data to the client. */
void sendReplyToClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    writeToClient(c,1);
}
Copy the code

Timing of events

Redis actually supports periodic task events, which are not deleted after execution, but re-inserted into the linked list.

Timers are managed in a linked list, and new scheduled tasks are inserted into the head of the linked list.

if (aeCreateTimeEvent(server.el, 1, serverCron, NULL.NULL) == AE_ERR) {
  serverPanic("Can't create event loop timers.");
  exit(1);
}
Copy the code

Periodic events are handled as follows:

/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;

    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId- 1;
    monotime now = getMonotonicUs();
  
    // Delete the timer
    while(te) {
        long long id;

        /* Remove events scheduled for deletion. */
        // The event is deleted in the next round
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            /* If a reference exists for this timer event, * don't free it. This is currently incremented * for recursive timerProc calls */
            if (te->refcount) {
                te = next;
                continue;
            }
            if (te->prev)
                te->prev->next = te->next;
            else
                eventLoop->timeEventHead = te->next;
            if (te->next)
                te->next->prev = te->prev;
            if (te->finalizerProc) {
                te->finalizerProc(eventLoop, te->clientData);
                now = getMonotonicUs();
            }
            zfree(te);
            te = next;
            continue;
        }

        /* Make sure we don't process time events created by time events in * this iteration. Note that this check is currently useless: we always * add new timers on the head, however if we change the implementation * detail, this check may be useful again: we keep it here for future * defense. */
        if (te->id > maxId) {
            te = te->next;
            continue;
        }

        if (te->when <= now) {
            int retval;

            id = te->id;
            te->refcount++;
            // timeProc Returns the value retval for the interval at which events are executed
            retval = te->timeProc(eventLoop, id, te->clientData);
            te->refcount--;
            processed++;
            now = getMonotonicUs();
            if(retval ! = AE_NOMORE) { te->when = now + retval *1000;
            } else {
               // If time out, mark it for deletion
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        te = te->next;
    }
    return processed;
}
Copy the code

The resources

  • Scalable IO in Java by Doug Lea

  • Redis Design and Implementation, Huang Jianhong