• Is Redis a single-threaded program?
  • Initialization of multiple IO threads
  • The IO thread runs the function IOThreadMain
    • How do I postpone a client read operation?
    • How do I defer client write operations?
    • How do I assign clients to read to IO threads?
    • How do I assign clients to write to IO threads for execution?
  • conclusion
  • Refer to the link
  • Redis source code concise analysis series

Is Redis a single-threaded program?

Redis is single-threaded only when handling “client requests”; The entire Redis server is not single threaded, there are background threads to assist in processing tasks.

Redis chooses single thread processing request, because Redis operates on “memory”, coupled with the design of “efficient” data structure, so the operation speed is very fast, using IO multiplexing mechanism, single thread can still have very high performance.

Redis does not allow the main thread to perform time-consuming operations such as synchronous writes and deletes. Instead, it allows the background thread to do them asynchronously, thus avoiding blocking the main thread.

Redis 6.0 version launched in May 2020, will also use multithreading to deal with IO tasks, can make full use of the multi-core characteristics of the server, the use of multi-core running multithreading, let multithreading help accelerate data reading, command parsing and data write back speed, improve the overall performance of Redis.

Initialization of multiple IO threads

In the main function, will call the InitServerLast function, Redis 6.0 source:

void InitServerLast(a) {
    bioInit();
    // Initializes the IO thread
    initThreadedIO();
    set_jemalloc_bg_thread(server.jemalloc_bg_thread);
    server.initial_memory_usage = zmalloc_used_memory();
}
Copy the code

After the bioInit function is called, the initThreadedIO function is called to initialize the multiIO thread. The initThreadedIO function is in the networking. C file.

void initThreadedIO(void) {
    // I/O thread activation flag: set to "inactive"
    server.io_threads_active = 0;

    // There is only one IO thread, so it returns directly to the main thread
    if (server.io_threads_num == 1) return;

    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }

    /* Spawn and initialize the I/O threads. */
    for (int i = 0; i < server.io_threads_num; i++) {
        io_threads_list[i] = listCreate();
        // Thread 0 is the main Thread
        if (i == 0) continue;

        /* Things we do only for the additional threads. */
        pthread_t tid;
        // initialize io_threads_mutex
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        setIOPendingCount(i, 0);
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        // pthread_create creates an IO thread that runs IOThreadMain
        if (pthread_create(&tid,NULL,IOThreadMain,(void(*)long)i) ! =0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        // Initializes the IO_threads array and sets the value to the thread idio_threads[i] = tid; }}Copy the code

Io_threads_num = io_threads_num

  • Io_threads_num = 1, which indicates that the main thread is processed directly
  • Io_threads_num > IO_THREADS_MAX_NUM: indicates the number of I/O threads > the value defined by the macro (the default value is 128)

The initThreadedIO function initializes the following four arrays:

  • io_threads_listArray: holds the clients to be processed by each IO thread, initializing each element of the array as a List of type List
  • io_threads_pendingArray: Holds the number of clients waiting for each IO thread to process
  • io_threads_mutexArray: Holds thread mutex
  • io_threadsArray: Holds descriptors for each IO thread

The four arrays are defined in the network.c file:


pthread_t io_threads[IO_THREADS_MAX_NUM];   // An array of thread descriptors
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];  // An array of thread mutexes
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];  // Record the number of clients to be processed by the thread
list *io_threads_list[IO_THREADS_MAX_NUM];  // Record the client that the thread corresponds to processing
Copy the code

The initThreadedIO function creates a thread by calling the pthread_create function in the for loop. Pthread_create For details, see pthread_CREATE (3) – Linux manual Page.

The function that the created thread runs is IOThreadMain, and the *arg argument is the number of the current created thread (starting with 1, 0 being the main I/O thread).

/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
    io_threads_list[i] = listCreate();
    // Thread 0 is the main Thread
    if (i == 0) continue;

    /* Things we do only for the additional threads. */
    pthread_t tid;
    // initialize io_threads_mutex
    pthread_mutex_init(&io_threads_mutex[i],NULL);
    setIOPendingCount(i, 0);
    pthread_mutex_lock(&io_threads_mutex[i]);
    // pthread_create creates an IO thread that runs IOThreadMain
    if (pthread_create(&tid,NULL,IOThreadMain,(void(*)long)i) ! =0) {
        serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
        exit(1);
    }
    // Initializes the IO_threads array and sets the value to the thread id
    io_threads[i] = tid;
}
Copy the code

The IO thread runs the function IOThreadMain

The main logic is a while(1) loop that will fetch the io_threads_list element corresponding to the thread, judge and process it.

void *IOThreadMain(void *myid) {...while(1) {
        /* Wait for start */
        for (int j = 0; j < 1000000; j++) {
            if(getIOPendingCount(id) ! =0) break; }...// Get the list of clients to be processed by the IO thread
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            // Get a client from the client list
            client *c = listNodeValue(ln);
            // Threads are "write operations" that call writeToClient to write data back to the client
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            // If it is a "read operation", call readQueryFromClient to read data from the client
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } 
            ……
        }
        // Clear the list of clients for this thread after all clients are processed
        listEmpty(io_threads_list[id]);
        // Set the number of pending tasks for this thread to 0
        setIOPendingCount(id, 0); }}Copy the code

Note: The above code io_threads_op variable is in and handleClientsWithPendingReadsUsingThreads handleClientsWithPendingWritesUsingThreads function Function.

Question: How do I add clients to the IO_threads_list array that I/O threads are dealing with?

In the redisServer global, there are two member variables of type List:

  • clients_pending_write: Indicates the client whose data is to be written back
  • clients_pending_read: Indicates the client from which data is to be read

struct redisServer {.// The client whose data is to be written back
    list *clients_pending_write;  
    // The client to read data from
    list*clients_pending_read; . }Copy the code

When Redis Server receives the request from the client and returns the data to the client, it will postpone the read and write operations of the client according to certain conditions and save the clients to be read and write in the two lists respectively. After that, Redis Server will add the clients in the list to the IO_threads_list array before entering the event loop, and hand it to the IO thread for processing.

How do I postpone a client read operation?

The callback function that handles readable events is readQueryFromClient.

void readQueryFromClient(connection *conn) {
    // Get the client from the connection structureclient *c = connGetPrivateData(conn); ...// Whether to defer reading data from the client (when using multithreaded IO)
    if (postponeClientRead(c)) return; ... }Copy the code

Let’s focus on postponeClientRead.

int postponeClientRead(client *c) {
    if(server.io_threads_active && server.io_threads_do_reads && ! ProcessingEventsWhileBlocked && ! (c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED))) {// Client flag Adds the CLIENT_PENDING_READ flag to delay client read operations
        c->flags |= CLIENT_PENDING_READ;
        // Add the client to the server's clients_pending_read list
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
        return 0; }}Copy the code

If: Whether the read operation of the current client can be postponed; Execution logic in if block: Add client to clientS_pending_read list. The following is the judgment condition:

  1. server.io_threads_active = 1: Multiple I/O threads are activated.
  2. server.io_threads_do_reads = 1: Multiple IO threads can be used to handle client read operations that are delayed in the Redis configuration file redis.conf through the configuration item. IO -threads-do-reads set. The default value is no.
  3. ProcessingEventsWhileBlocked = 0: ProcessingEventsWhileBlocked function not in execution when Redis, while reading RDB file or AOF, will call this function is used to handle the event-driven framework to capture events, avoid Redis obstruction caused by reading RDB or AOF files.
  4. The client cannot have an existing idCLIENT_MASTER,CLIENT_SLAVECLIENT_PENDING_READ
    • CLIENT_MASTER: The client is used for primary/secondary replication
    • CLIENT_SLAVE: The client is used for master/slave replication
    • CLIENT_PENDING_READ: The client is set to defer read operations

How do I defer client write operations?

When Redis executes a client command and wants to return the result to the client, it calls the addReply function to write the result to the output buffer. The addReply function starts by calling the prepareClientToWrite function.

/* ----------------------------------------------------------------------------- * Higher level functions to queue data on the client output buffer. * The following functions are the ones that commands implementations will call. * -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - * /

/* Add the object 'obj' string representation to the client output buffer. */
void addReply(client *c, robj *obj) {
    if(prepareClientToWrite(c) ! = C_OK)return; ... }Copy the code

The prepareClientToWrite function is commented as follows:

/* This function is called every time we are going to transmit new data
 * to the client. The behavior is the following:
 *
 * If the client should receive new data (normal clients will) the function
 * returns C_OK, and make sure to install the write handler in our event
 * loop so that when the socket is writable new data gets written.
 *
 * If the client should not receive new data, because it is a fake client
 * (used to load AOF in memory), a master or because the setup of the write
 * handler failed, the function returns C_ERR.
 *
 * The function may return C_OK without actually installing the write
 * event handler in the following cases:
 *
 * 1) The event handler should already be installed since the output buffer
 *    already contains something.
 * 2) The client is a slave but not yet online, so we want to just accumulate
 *    writes in the buffer but not actually sending them yet.
 *
 * Typically gets called every time a reply is built, before adding more
 * data to the clients output buffers. If the function returns C_ERR no
 * data should be appended to the output buffers. */
Copy the code
int prepareClientToWrite(client *c) {...// The current client has no data to write back && flag Does not contain CLIENT_PENDING_READ
    if(! clientHasPendingReplies(c) && ! (c->flags & CLIENT_PENDING_READ)) clientInstallWriteHandler(c);return C_OK;
}
Copy the code

ClientInstallWriteHandler as follows, judging the if condition is not here.

void clientInstallWriteHandler(client *c) {

    if(! (c->flags & CLIENT_PENDING_WRITE) && (c->replstate == REPL_STATE_NONE || (c->replstate == SLAVE_STATE_ONLINE && ! c->repl_put_online_on_ack))) {// Set the client identity to CLIENT_PENDING_WRITE (to be written back)
        c->flags |= CLIENT_PENDING_WRITE;
        // Add client to the server's clients_pending_write listlistAddNodeHead(server.clients_pending_write,c); }}Copy the code

How does Redis allocate clients that defer read and write operations to multiple I/O threads? By:

  • HandleClientsWithPendingReadsUsingThreads function: Assigns clients from the clientS_pending_read list to the IO thread
  • HandleClientsWithPendingWritesUsingThreads function: Assigns clients in the ClientS_pending_write list to IO threads

How do I assign clients to read to IO threads?

BeforeSleep function call handleClientsWithPendingReadsUsingThreads function:

/* We should handle pending reads clients ASAP after event loop. */
handleClientsWithPendingReadsUsingThreads();
Copy the code

HandleClientsWithPendingReadsUsingThreads function as follows, logic in the comments:

/* When threaded I/O is also enabled for the reading + parsing side, the * readable handler will just put normal clients into a queue of clients to * process (instead of serving them synchronously). This function runs * the queue using the I/O threads, and process them in order to accumulate * the reads in the buffers, and also parse the first command available * rendering it in the client structures. */
int handleClientsWithPendingReadsUsingThreads(void) {
    // Check whether io_threads_active is activated and whether io_threads_do_reads can process clients with I/O threads
    if(! server.io_threads_active || ! server.io_threads_do_reads)return 0;

    // Determine the clients_pending_read length
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    // Get the client list of clients_pending_read
    listRewind(server.clients_pending_read,&li);
    // Assign clients to IO threads in polling mode
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // Set the operation identifier of the IO thread to "read operation".
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        // The number of clients waiting for processing per thread → io_threads_pending array
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }

    // Process the client to read for thread 0 (main thread)
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    // Clear the 0 list
    listEmpty(io_threads_list[0]);

    // loop, waiting for all other IO threads to finish processing clients
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }

    /* Run the list of clients again to process the new buffers. */
    // Retrieve the clients_pending_read list
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        // Check if the client identifier has CLIENT_PENDING_READ, if it has been parsed by the IO thread
        c->flags &= ~CLIENT_PENDING_READ;
        // Remove the client from the clients_pending_read listlistDelNode(server.clients_pending_read,ln); serverAssert(! (c->flags & CLIENT_BLOCKED));if (processPendingCommandsAndResetClient(c) == C_ERR) {
            /* If the client is no longer valid, we avoid * processing the client later. So we just go * to the next. */
            continue;
        }

        // Parse and execute all client commands
        processInputBuffer(c);

        /* We may have pending replies if a thread readQueryFromClient() produced * replies and did not install a write handler (it can't). */
        if(! (c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c)) clientInstallWriteHandler(c); }/* Update processed count on server */
    server.stat_io_reads_processed += processed;

    return processed;
}
Copy the code

How do I assign clients to write to IO threads for execution?

To write the client distribution processing is done by handleClientsWithPendingWritesUsingThreads function, the function is invoked in beforeSleep function. Logic and handleClientsWithPendingReadsUsingThreads function.

int handleClientsWithPendingWritesUsingThreads(void) {

    // Determine the number of clients_pending_write lists
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0;

    / / only master IO thread | | threads do not use the IO
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }

    /* Start threads if needed. */
    if(! server.io_threads_active) startThreadedIO();/* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    // Assign the client to the IO thread according to the polling mode
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;

        if (c->flags & CLIENT_CLOSE_ASAP) {
            listDelNode(server.clients_pending_write, ln);
            continue;
        }

        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // Set the operation identifier of the IO thread to "write operation"
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        // The number of clients waiting for processing per thread → io_threads_pending array
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }

    /* Also use the main thread to process a slice of clients. */
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);

    // loop, waiting for all other IO thread clients to finish processing
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }

    /* Run the list of clients again to install the write handler where * needed. */
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        // Check again to see if the client is waiting to be written
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);

    /* Update processed count on server */
    server.stat_io_writes_processed += processed;

    return processed;
}
Copy the code

Note that stopThreadedIOIfNeeded will determine the number of clients to be written if the number of I/O threads * 2, then will return directly, directly use the primary I/O thread to process the clients to be written. This is because there are not many clients to write to, and the efficiency of using multithreading decreases.

int stopThreadedIOIfNeeded(void) {
    int pending = listLength(server.clients_pending_write);

    /* Return ASAP if IO threads are disabled (single threaded mode). */
    if (server.io_threads_num == 1) return 1;

    if (pending < (server.io_threads_num*2)) {
        if (server.io_threads_active) stopThreadedIO();
        return 1;
    } else {
        return 0; }}Copy the code

conclusion

The multi-IO thread mechanism implemented by Redis 6.0 mainly uses multiple IO threads to concurrently process client data reading, parsing commands and writing back data, making full use of the multi-core features of the server and improving IO efficiency.

Redis Server calls the postponeClientRead function based on the readQueryFromClient function to decide whether to postpone the client operation. The prepareClientToWrite function in the addReply function determines whether to defer client writes. The client to be read is added to the clientS_pending_read list, and the client to be written is added to the clientS_pending_write list.

After the IO thread is created, it checks the IO_threads_list list and calls readQueryFromClient or writeToClient if a client is waiting to read or write.

However, the multi-IO thread does not execute the command, the execution command is still in the main I/O thread.

Refer to the link

  • Geek time: 12 | Redis really is single thread?
  • Geek: 13 | Redis 6.0 multiple threads of IO’s efficiency?
  • Pthread_create (3) – Linux manual Page.

Redis source code concise analysis series

The most concise Redis source code analysis series of articles

Java programming ideas – the most complete mind map -GitHub download link, need partners can take ~

Original is not easy, I hope you reprint when please contact me, and mark the original link.