preface

In the previous article, the initialization of eventLoop and the invocation of epoll and other methods during the redis server startup process created multiple event handlers to handle client connections and register read events after client connections. This paper analyzes how Redis handles commands from the perspective of multi-threading

The body of the

Redis6.0 added multithreading features, but the previous source analysis can be seen, redis underlying data structure in the increase and delete above no lock, so you can conclude that multithreading is not processing data concurrency, but IO multithreading

IO Multithreading is used to read data from the client. The data is read into the cache. The main thread in the middle executes logic to store the result into the send cache

The advantage of this is to reduce the main thread blocking due to network IO

Multithread initialization

Bio initialization

void InitServerLast(a) {
    bioInit();
    initThreadedIO();
    set_jemalloc_bg_thread(server.jemalloc_bg_thread);
    server.initial_memory_usage = zmalloc_used_memory();
}
Copy the code

The initServer call is followed by a call to InitServerLast, where the bio and Thread are initialized

void bioInit(void) {
    pthread_attr_t attr;
    pthread_t thread;
    size_t stacksize;
    int j;

   // Initialize locks, etc
    for (j = 0; j < BIO_NUM_OPS; j++) {
        pthread_mutex_init(&bio_mutex[j],NULL);
        pthread_cond_init(&bio_newjob_cond[j],NULL);
        pthread_cond_init(&bio_step_cond[j],NULL);
        bio_jobs[j] = listCreate();
        bio_pending[j] = 0;
    }

    pthread_attr_init(&attr);
    pthread_attr_getstacksize(&attr,&stacksize);
    if(! stacksize) stacksize =1;
    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
    pthread_attr_setstacksize(&attr, stacksize);

    // Create a thread
    for (j = 0; j < BIO_NUM_OPS; j++) {
        void *arg = (void(*)unsigned long) j;
        if(pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) ! =0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1); } bio_threads[j] = thread; }}Copy the code

BioInit creates three threads that run with the method bioProcessBackgroundJobs, marked with 0, 1, 2

void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
    unsigned long type = (unsigned long) arg;
    sigset_t sigset;
    
    if (type >= BIO_NUM_OPS) {
        serverLog(LL_WARNING,
            "Warning: bio thread started with wrong type %lu",type);
        return NULL;
    }

    switch (type) {
    case BIO_CLOSE_FILE:
        redis_set_thread_title("bio_close_file");
        break;
    case BIO_AOF_FSYNC:
        redis_set_thread_title("bio_aof_fsync");
        break;
    case BIO_LAZY_FREE:
        redis_set_thread_title("bio_lazy_free");
        break;
    }
Copy the code

You can see that the three threads are: close file, FSYHC of AOF, and deferred deletion thread

while(1) {
        listNode *ln;

        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
            continue;
        }
        // Retrieve a job
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        pthread_mutex_unlock(&bio_mutex[type]);

        // Actually handle events
        if (type == BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == BIO_AOF_FSYNC) {
            redis_fsync((long)job->arg1);
        } else if (type == BIO_LAZY_FREE) {
            if (job->arg1)
                lazyfreeFreeObjectFromBioThread(job->arg1);
            else if (job->arg2 && job->arg3)
                lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
            else if (job->arg3)
                lazyfreeFreeSlotsMapFromBioThread(job->arg3);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        zfree(job);


        pthread_mutex_lock(&bio_mutex[type]);
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--;

        pthread_cond_broadcast(&bio_step_cond[type]);
    }
Copy the code

These three threads fetch bio_JOBS linked list nodes and process them according to type, as seen in the aOF source code:

void aof_background_fsync(int fd) {
    bioCreateBackgroundJob(BIO_AOF_FSYNC,(void(*)long)fd,NULL.NULL);
}

void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));

    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    pthread_mutex_lock(&bio_mutex[type]);
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;
    pthread_cond_signal(&bio_newjob_cond[type]);
    pthread_mutex_unlock(&bio_mutex[type]);
}
Copy the code

This is done by adding nodes to the linked list to submit tasks

First, the BIO is initialized and three threads are created to handle file closing, fsync, and deferred deletion tasks respectively. This part of the logic that does not involve the main service is multi-threaded to improve the running efficiency

Multithread initialization

void initThreadedIO(void) {
    io_threads_active = 0; /* We start with threads not active. */

    // Set to 1 to indicate that multithreading is not enabled
    if (server.io_threads_num == 1) return;

    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }
    
    for (int i = 0; i < server.io_threads_num; i++) {
        /* Things we do for all the threads including the main thread. */
        io_threads_list[i] = listCreate();
        if (i == 0) continue; /* Thread 0 is the main thread. */

        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        io_threads_pending[i] = 0;
        // Lock the thread
        pthread_mutex_lock(&io_threads_mutex[i]);
        if (pthread_create(&tid,NULL,IOThreadMain,(void(*)long)i) ! =0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1); } io_threads[i] = tid; }}Copy the code

The number of threads is created using io_threads_num and the thread is locked. The core logic is IOThreadMain

void *IOThreadMain(void *myid) {
    long id = (unsigned long)myid;
    char thdname[16];

    // Set the thread name to print
    snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
    redis_set_thread_title(thdname);
    redisSetCpuAffinity(server.server_cpulist);

    while(1) {
        // Wait for the thread to start
        for (int j = 0; j < 1000000; j++) {
            if(io_threads_pending[id] ! =0) break;
        }

        // There is a lock and unlock method
        if (io_threads_pending[id] == 0) {
            // The IOThreadMain method was locked once before it was called, so here the lock will cause the method to block until startThreadedIO
            pthread_mutex_lock(&io_threads_mutex[id]);
            // Unlock prevents the stopThreadedIO lock method from forming a deadlock
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue; } serverAssert(io_threads_pending[id] ! =0);

        if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));

        /* Process: note that the main thread will never touch our list * before we drop the pending count to 0. */
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        io_threads_pending[id] = 0;

        if (tio_debug) printf("[%ld] Done\n", id); }}Copy the code

Io_threads_pending = io_threads_list = io_threads_list = io_threads_list Until the other method, unlock, starts the main logic and gets the io_threads_list event. Depending on the event type, write and write are divided into writeToClient and readQueryFromClient

Multithreaded core

IO multithreading, like BIO, is waiting to process array events. When was that added?

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}
Copy the code

The server main method ends with a call to the aeMain method for an event loop

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    // Nothing happened
    if(! (flags & AE_TIME_EVENTS) && ! (flags & AE_FILE_EVENTS))return 0;
    
    if(eventLoop->maxfd ! =- 1|| ((flags & AE_TIME_EVENTS) && ! (flags & AE_DONT_WAIT))) {int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if(flags & AE_TIME_EVENTS && ! (flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop);if (shortest) {
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            // Calculate how long it takes to wait until the next timer execution time
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;

            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000) *1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0; }}else {
         
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                tvp = NULL; }}if (eventLoop->flags & AE_DONT_WAIT) {
            tv.tv_sec = tv.tv_usec = 0;
            tvp = &tv;
        }
Copy the code

The scheduled task is handled first, without much explanation, where serverCron is called

if(eventLoop->beforesleep ! =NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

        // Get the number of arrival events
        numevents = aeApiPoll(eventLoop, tvp);
        
        if(eventLoop->aftersleep ! =NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);
Copy the code

Here we call Beforesleep — aeApiPoll — aftersleep, and the aeApiPoll blocks for a while to get the incoming events. Forget beforesleep and Aftersleep, here we get several incoming events via the aeApiPoll

for (j = 0; j < numevents; j++) {
            // Get the event via fired
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0;


            // Reverse the read/write event to write/read
            int invert = fe->mask & AE_BARRIER;

            if(! invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++;/ / refresh
                fe = &eventLoop->events[fd];
            }

            if (fe->mask & mask & AE_WRITABLE) {
                if(! fired || fe->wfileProc ! = fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; }}if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if((fe->mask & mask & AE_READABLE) && (! fired || fe->wfileProc ! = fe->rfileProc)) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; }}// Trigger timer
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
Copy the code

When the event arrives, rfileProc and wfileProc are called based on the type of the event. For clients, writeToClient and readQueryFromClient are called. Both threads and main threads call readQueryFromClient. Let’s see what’s inside

void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;

    // Check if we want to read the event loop from the client on exit later. This happens if thread I/O is enabled.
    if (postponeClientRead(c)) return; .int postponeClientRead(client *c) {
    if(io_threads_active && server.io_threads_do_reads && ! ProcessingEventsWhileBlocked && ! (c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))) { c->flags |= CLIENT_PENDING_READ; listAddNodeHead(server.clients_pending_read,c);return 1;
    } else {
        return 0; }}Copy the code

The first step in readQueryFromClient checks whether multithreading is enabled, and if so, adds the current client to the clients_pending_read list

++ If multithreading is enabled, the client data will not be read in one event loop, but in the clients_pending_read list. The actual reading of client data will be called in the second event loop, through the beforeSleep method ++

void beforeSleep(struct aeEventLoop *eventLoop) {
    UNUSED(eventLoop);

    /* Just call a subset of vital functions in case we are re-entering * the event loop from processEventsWhileBlocked(). Note that in this * case we keep track of the number of events we are processing, since * processEventsWhileBlocked() wants to stop ASAP if there are no longer * events to handle. */
    if (ProcessingEventsWhileBlocked) {
        uint64_t processed = 0;
        processed += handleClientsWithPendingReadsUsingThreads();
        processed += tlsProcessPendingData();
        processed += handleClientsWithPendingWrites();
        processed += freeClientsInAsyncFreeQueue();
        server.events_processed_while_blocked += processed;
        return;
    }

    // Used to unlock client timeout
    handleBlockedClientsTimeout();

    // Assign read tasks and execute them
    handleClientsWithPendingReadsUsingThreads();

    tlsProcessPendingData();
    aeSetDontWait(server.el, tlsHasPendingData());

    / / cluster
    if (server.cluster_enabled) clusterBeforeSleep();

    // Quickly discard data
    if (server.active_expire_enabled && server.masterhost == NULL)
        activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);

    // Unlock the client that invoked wait
    if (listLength(server.clients_waiting_acks))
        processClientsWaitingReplicas();

    /* Check if there are clients unblocked by modules that implement * blocking commands. */
    if (moduleCount()) moduleHandleBlockedClients();

    // * Try to handle the client suspend command that has just been removed. * /
    if (listLength(server.unblocked_clients))
        processUnblockedClients();

    /* Send all the slaves an ACK request if at least one client blocked * during the previous event loop iteration. Note that we do this after * processUnblockedClients(), so if there are multiple pipelined WAITs * and the just unblocked WAIT gets blocked again, we don't have to wait * a server cron cycle in absence of other event loop events. See #6623. */
    if (server.get_ack_from_slaves) {
        robj *argv[3];

        argv[0] = createStringObject("REPLCONF".8);
        argv[1] = createStringObject("GETACK".6);
        argv[2] = createStringObject("*".1); /* Not used argument. */
        replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
        decrRefCount(argv[0]);
        decrRefCount(argv[1]);
        decrRefCount(argv[2]);
        server.get_ack_from_slaves = 0;
    }

    /* Send the invalidation messages to clients participating to the * client side caching protocol in broadcasting (BCAST)  mode. */
    trackingBroadcastInvalidationMessages();

    // Execute aof flush
    flushAppendOnlyFile(0);

    / / writing tasks
    handleClientsWithPendingWritesUsingThreads();

    /* Close clients that need to be closed asynchronous */
    freeClientsInAsyncFreeQueue();

    /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this * time. */
    if (moduleCount()) moduleReleaseGIL();
}
Copy the code

This method does the following:

  • HandleBlockedClientsTimeout: used to unlock the client timeout
  • HandleClientsWithPendingReadsUsingThreads: reading assignment And perform
  • ActiveExpireCycle: Rapidly eliminates data
  • ProcessClientsWaitingReplicas: unlock calling wait command to the client
  • FlushAppendOnlyFile: aofFlush
  • HandleClientsWithPendingWritesUsingThreads: writing assignments

FlushAppendOnlyFile (flushAppendOnlyFile) {flushAppendOnlyFile (flushAppendOnlyFile) {flushAppendOnlyFile (flushAppendOnlyFile);

int handleClientsWithPendingReadsUsingThreads(void) {
    if(! io_threads_active || ! server.io_threads_do_reads)return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);

    // Allocate the connection waiting for processing to the I/O thread
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // Set io_threads_pending to start the I/O thread
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    //index=0 to the main thread
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);

    // Wait for other threads to finish processing
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O READ All threads finshed\n");

    // After reading, proceed with processing
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        listDelNode(server.clients_pending_read,ln);

        if (c->flags & CLIENT_PENDING_COMMAND) {
            c->flags &= ~CLIENT_PENDING_COMMAND;
            if (processCommandAndResetClient(c) == C_ERR) {
                continue;
            }
        }
        processInputBuffer(c);
    }
    return processed;
}
Copy the code

The clients in clients_pending_read are evenly distributed among several threads, and they also participate in reading data. Finally, they wait for all other threads to finish reading data

void processInputBuffer(client *c) {
    /* Keep processing while there is something in the input buffer */
    while(c->qb_pos < sdslen(c->querybuf)) {
       // The client is suspended
        if(! (c->flags & CLIENT_SLAVE) && clientsArePaused())break;

        / / client block
        if (c->flags & CLIENT_BLOCKED) break;

        PENDING_COMMAND is not handled
        if (c->flags & CLIENT_PENDING_COMMAND) break;

        /* Don't process input from the master while there is a busy script * condition on the slave. We want just to accumulate  the replication * stream (instead of replying -BUSY like we do with other clients) and * later resume the processing. * /
        if (server.lua_timedout && c->flags & CLIENT_MASTER) break;

        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

        /* Determine request type when unknown. */
        if(! c->reqtype) {if (c->querybuf[c->qb_pos] == The '*') {
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else{ c->reqtype = PROTO_REQ_INLINE; }}if (c->reqtype == PROTO_REQ_INLINE) {
            if(processInlineBuffer(c) ! = C_OK)break;
            /* If the Gopher mode and we got zero or one argument, process * the request in Gopher mode. */
            if (server.gopher_enabled &&
                ((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] = ='/') ||
                  c->argc == 0))
            {
                processGopherRequest(c);
                resetClient(c);
                c->flags |= CLIENT_CLOSE_AFTER_REPLY;
                break; }}else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if(processMultibulkBuffer(c) ! = C_OK)break;
        } else {
            serverPanic("Unknown request type");
        }

        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            // IO multithreaded execution of this will interrupt the execution of the command
            if (c->flags & CLIENT_PENDING_READ) {
                c->flags |= CLIENT_PENDING_COMMAND;
                break;
            }

            // The core method of executing commands
            if (processCommandAndResetClient(c) == C_ERR) {
                /* If the client is no longer valid, we avoid exiting this * loop and trimming the client buffer later. So we return * ASAP in that case. */
                return; }}}/ / interception buf
    if (c->qb_pos) {
        sdsrange(c->querybuf,c->qb_pos,- 1);
        c->qb_pos = 0; }}Copy the code

ReadQueryFromClient reads commands into QueryBuf and calls processInputBuffer to handle queryBuf execution commands. If multithreaded reading is enabled, the current thread does not actually execute commands. Instead, set the CLIENT_PENDING_COMMAND flag to be handled on the main thread. Just came back to the last step, handleClientsWithPendingReadsUsingThreads call processCommandAndResetClient execute the command

int processCommandAndResetClient(client *c) {
    int deadclient = 0;
    server.current_client = c;
    if (processCommand(c) == C_OK) {
        commandProcessed(c);
    }
    if (server.current_client == NULL) deadclient = 1;
    server.current_client = NULL;
    /* freeMemoryIfNeeded may flush slave output buffers. This may * result into a slave, that may be the active client, to be * freed. */
    return deadclient ? C_ERR : C_OK;
}
Copy the code

Call the processCommand method to enter the command execution process that was analyzed in the previous article

int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */

    // Can read without multithreading
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }

    // Enable multithreading
    if(! io_threads_active) startThreadedIO();if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);

    // Assign clients_pending_write clients to multiple threads
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }


    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    // The main thread also takes parameters
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);

    // Wait for all threads to finish their work
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O WRITE All threads finshed\n");

    // If there is still some left to send
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        /* Install the write handler if there are pending writes in some * of the clients. */
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);
    return processed;
}
Copy the code

Handle write events much the same as read events, except that stopThreadedIOIfNeeded is used to determine whether to turn off multithreading

int stopThreadedIOIfNeeded(void) {
    int pending = listLength(server.clients_pending_write);

    if (server.io_threads_num == 1) return 1;

    // Stop multithreading if the data to be sent is less than twice the number of threads
    if (pending < (server.io_threads_num*2)) {
        if (io_threads_active) stopThreadedIO();
        return 1;
    } else {
        return 0; }}void stopThreadedIO(void) {
    // Call a read before stopping to complete the multithreaded read call
    handleClientsWithPendingReadsUsingThreads();
    if (tio_debug) { printf("E"); fflush(stdout); }
    if (tio_debug) printf("--- STOPPING THREADED IO [R%d] [W%d] ---\n",
        (int) listLength(server.clients_pending_read),
        (int) listLength(server.clients_pending_write));
    serverAssert(io_threads_active == 1);
    for (int j = 1; j < server.io_threads_num; j++)
        pthread_mutex_lock(&io_threads_mutex[j]);
    io_threads_active = 0;
}
Copy the code

The thread is blocked by pthread_mutex_lock

int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    int processed = listLength(server.clients_pending_write);

    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        listDelNode(server.clients_pending_write,ln);

        // The client is protected
        if (c->flags & CLIENT_PROTECTED) continue;

        /* Try to write buffers to the client socket. */
        if (writeToClient(c,0) == C_ERR) continue;

        // If there is still data after writing
        if (clientHasPendingReplies(c)) {
            int ae_barrier = 0;
            // If aof is enabled and always is set to ae_barrier, write events should be executed first
            if (server.aof_state == AOF_ON &&
                server.aof_fsync == AOF_FSYNC_ALWAYS)
            {
                ae_barrier = 1;
            }
            if(connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) { freeClientAsync(c); }}}return processed;
}
Copy the code

The main thread returns the result of execution to the client, and the core calls the writeToClient method to send the content that needs to be returned in the BUF. The specific process is not broadcast

conclusion

Redis creates three fixed task threads and multiple IO threads to read and send client data. Before each event cycle, events in the previous event cycle will be processed, and the read task will be assigned to multithreading for execution. After reading, the main thread will execute database operations. This reduces the overhead of network IO, but threads also incur overhead and limited improvement