preface
In the previous article, the initialization of eventLoop and the invocation of epoll and other methods during the redis server startup process created multiple event handlers to handle client connections and register read events after client connections. This paper analyzes how Redis handles commands from the perspective of multi-threading
The body of the
Redis6.0 added multithreading features, but the previous source analysis can be seen, redis underlying data structure in the increase and delete above no lock, so you can conclude that multithreading is not processing data concurrency, but IO multithreading
IO Multithreading is used to read data from the client. The data is read into the cache. The main thread in the middle executes logic to store the result into the send cache
The advantage of this is to reduce the main thread blocking due to network IO
Multithread initialization
Bio initialization
void InitServerLast(a) {
bioInit();
initThreadedIO();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
Copy the code
The initServer call is followed by a call to InitServerLast, where the bio and Thread are initialized
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
// Initialize locks, etc
for (j = 0; j < BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_newjob_cond[j],NULL);
pthread_cond_init(&bio_step_cond[j],NULL);
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if(! stacksize) stacksize =1;
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);
// Create a thread
for (j = 0; j < BIO_NUM_OPS; j++) {
void *arg = (void(*)unsigned long) j;
if(pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) ! =0) {
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1); } bio_threads[j] = thread; }}Copy the code
BioInit creates three threads that run with the method bioProcessBackgroundJobs, marked with 0, 1, 2
void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job;
unsigned long type = (unsigned long) arg;
sigset_t sigset;
if (type >= BIO_NUM_OPS) {
serverLog(LL_WARNING,
"Warning: bio thread started with wrong type %lu",type);
return NULL;
}
switch (type) {
case BIO_CLOSE_FILE:
redis_set_thread_title("bio_close_file");
break;
case BIO_AOF_FSYNC:
redis_set_thread_title("bio_aof_fsync");
break;
case BIO_LAZY_FREE:
redis_set_thread_title("bio_lazy_free");
break;
}
Copy the code
You can see that the three threads are: close file, FSYHC of AOF, and deferred deletion thread
while(1) {
listNode *ln;
/* The loop always starts with the lock hold. */
if (listLength(bio_jobs[type]) == 0) {
pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
continue;
}
// Retrieve a job
ln = listFirst(bio_jobs[type]);
job = ln->value;
pthread_mutex_unlock(&bio_mutex[type]);
// Actually handle events
if (type == BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == BIO_AOF_FSYNC) {
redis_fsync((long)job->arg1);
} else if (type == BIO_LAZY_FREE) {
if (job->arg1)
lazyfreeFreeObjectFromBioThread(job->arg1);
else if (job->arg2 && job->arg3)
lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
else if (job->arg3)
lazyfreeFreeSlotsMapFromBioThread(job->arg3);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
zfree(job);
pthread_mutex_lock(&bio_mutex[type]);
listDelNode(bio_jobs[type],ln);
bio_pending[type]--;
pthread_cond_broadcast(&bio_step_cond[type]);
}
Copy the code
These three threads fetch bio_JOBS linked list nodes and process them according to type, as seen in the aOF source code:
void aof_background_fsync(int fd) {
bioCreateBackgroundJob(BIO_AOF_FSYNC,(void(*)long)fd,NULL.NULL);
}
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));
job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
pthread_mutex_lock(&bio_mutex[type]);
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
pthread_cond_signal(&bio_newjob_cond[type]);
pthread_mutex_unlock(&bio_mutex[type]);
}
Copy the code
This is done by adding nodes to the linked list to submit tasks
First, the BIO is initialized and three threads are created to handle file closing, fsync, and deferred deletion tasks respectively. This part of the logic that does not involve the main service is multi-threaded to improve the running efficiency
Multithread initialization
void initThreadedIO(void) {
io_threads_active = 0; /* We start with threads not active. */
// Set to 1 to indicate that multithreading is not enabled
if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
// Lock the thread
pthread_mutex_lock(&io_threads_mutex[i]);
if (pthread_create(&tid,NULL,IOThreadMain,(void(*)long)i) ! =0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1); } io_threads[i] = tid; }}Copy the code
The number of threads is created using io_threads_num and the thread is locked. The core logic is IOThreadMain
void *IOThreadMain(void *myid) {
long id = (unsigned long)myid;
char thdname[16];
// Set the thread name to print
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
redisSetCpuAffinity(server.server_cpulist);
while(1) {
// Wait for the thread to start
for (int j = 0; j < 1000000; j++) {
if(io_threads_pending[id] ! =0) break;
}
// There is a lock and unlock method
if (io_threads_pending[id] == 0) {
// The IOThreadMain method was locked once before it was called, so here the lock will cause the method to block until startThreadedIO
pthread_mutex_lock(&io_threads_mutex[id]);
// Unlock prevents the stopThreadedIO lock method from forming a deadlock
pthread_mutex_unlock(&io_threads_mutex[id]);
continue; } serverAssert(io_threads_pending[id] ! =0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
/* Process: note that the main thread will never touch our list * before we drop the pending count to 0. */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done\n", id); }}Copy the code
Io_threads_pending = io_threads_list = io_threads_list = io_threads_list Until the other method, unlock, starts the main logic and gets the io_threads_list event. Depending on the event type, write and write are divided into writeToClient and readQueryFromClient
Multithreaded core
IO multithreading, like BIO, is waiting to process array events. When was that added?
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
Copy the code
The server main method ends with a call to the aeMain method for an event loop
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
// Nothing happened
if(! (flags & AE_TIME_EVENTS) && ! (flags & AE_FILE_EVENTS))return 0;
if(eventLoop->maxfd ! =- 1|| ((flags & AE_TIME_EVENTS) && ! (flags & AE_DONT_WAIT))) {int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if(flags & AE_TIME_EVENTS && ! (flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop);if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
// Calculate how long it takes to wait until the next timer execution time
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000) *1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0; }}else {
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
tvp = NULL; }}if (eventLoop->flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
}
Copy the code
The scheduled task is handled first, without much explanation, where serverCron is called
if(eventLoop->beforesleep ! =NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
// Get the number of arrival events
numevents = aeApiPoll(eventLoop, tvp);
if(eventLoop->aftersleep ! =NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
Copy the code
Here we call Beforesleep — aeApiPoll — aftersleep, and the aeApiPoll blocks for a while to get the incoming events. Forget beforesleep and Aftersleep, here we get several incoming events via the aeApiPoll
for (j = 0; j < numevents; j++) {
// Get the event via fired
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0;
// Reverse the read/write event to write/read
int invert = fe->mask & AE_BARRIER;
if(! invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++;/ / refresh
fe = &eventLoop->events[fd];
}
if (fe->mask & mask & AE_WRITABLE) {
if(! fired || fe->wfileProc ! = fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; }}if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if((fe->mask & mask & AE_READABLE) && (! fired || fe->wfileProc ! = fe->rfileProc)) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; }}// Trigger timer
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
Copy the code
When the event arrives, rfileProc and wfileProc are called based on the type of the event. For clients, writeToClient and readQueryFromClient are called. Both threads and main threads call readQueryFromClient. Let’s see what’s inside
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
size_t qblen;
// Check if we want to read the event loop from the client on exit later. This happens if thread I/O is enabled.
if (postponeClientRead(c)) return; .int postponeClientRead(client *c) {
if(io_threads_active && server.io_threads_do_reads && ! ProcessingEventsWhileBlocked && ! (c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))) { c->flags |= CLIENT_PENDING_READ; listAddNodeHead(server.clients_pending_read,c);return 1;
} else {
return 0; }}Copy the code
The first step in readQueryFromClient checks whether multithreading is enabled, and if so, adds the current client to the clients_pending_read list
++ If multithreading is enabled, the client data will not be read in one event loop, but in the clients_pending_read list. The actual reading of client data will be called in the second event loop, through the beforeSleep method ++
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
/* Just call a subset of vital functions in case we are re-entering * the event loop from processEventsWhileBlocked(). Note that in this * case we keep track of the number of events we are processing, since * processEventsWhileBlocked() wants to stop ASAP if there are no longer * events to handle. */
if (ProcessingEventsWhileBlocked) {
uint64_t processed = 0;
processed += handleClientsWithPendingReadsUsingThreads();
processed += tlsProcessPendingData();
processed += handleClientsWithPendingWrites();
processed += freeClientsInAsyncFreeQueue();
server.events_processed_while_blocked += processed;
return;
}
// Used to unlock client timeout
handleBlockedClientsTimeout();
// Assign read tasks and execute them
handleClientsWithPendingReadsUsingThreads();
tlsProcessPendingData();
aeSetDontWait(server.el, tlsHasPendingData());
/ / cluster
if (server.cluster_enabled) clusterBeforeSleep();
// Quickly discard data
if (server.active_expire_enabled && server.masterhost == NULL)
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
// Unlock the client that invoked wait
if (listLength(server.clients_waiting_acks))
processClientsWaitingReplicas();
/* Check if there are clients unblocked by modules that implement * blocking commands. */
if (moduleCount()) moduleHandleBlockedClients();
// * Try to handle the client suspend command that has just been removed. * /
if (listLength(server.unblocked_clients))
processUnblockedClients();
/* Send all the slaves an ACK request if at least one client blocked * during the previous event loop iteration. Note that we do this after * processUnblockedClients(), so if there are multiple pipelined WAITs * and the just unblocked WAIT gets blocked again, we don't have to wait * a server cron cycle in absence of other event loop events. See #6623. */
if (server.get_ack_from_slaves) {
robj *argv[3];
argv[0] = createStringObject("REPLCONF".8);
argv[1] = createStringObject("GETACK".6);
argv[2] = createStringObject("*".1); /* Not used argument. */
replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
decrRefCount(argv[2]);
server.get_ack_from_slaves = 0;
}
/* Send the invalidation messages to clients participating to the * client side caching protocol in broadcasting (BCAST) mode. */
trackingBroadcastInvalidationMessages();
// Execute aof flush
flushAppendOnlyFile(0);
/ / writing tasks
handleClientsWithPendingWritesUsingThreads();
/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();
/* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this * time. */
if (moduleCount()) moduleReleaseGIL();
}
Copy the code
This method does the following:
- HandleBlockedClientsTimeout: used to unlock the client timeout
- HandleClientsWithPendingReadsUsingThreads: reading assignment And perform
- ActiveExpireCycle: Rapidly eliminates data
- ProcessClientsWaitingReplicas: unlock calling wait command to the client
- FlushAppendOnlyFile: aofFlush
- HandleClientsWithPendingWritesUsingThreads: writing assignments
FlushAppendOnlyFile (flushAppendOnlyFile) {flushAppendOnlyFile (flushAppendOnlyFile) {flushAppendOnlyFile (flushAppendOnlyFile);
int handleClientsWithPendingReadsUsingThreads(void) {
if(! io_threads_active || ! server.io_threads_do_reads)return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);
// Allocate the connection waiting for processing to the I/O thread
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
// Set io_threads_pending to start the I/O thread
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
//index=0 to the main thread
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
// Wait for other threads to finish processing
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
// After reading, proceed with processing
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
listDelNode(server.clients_pending_read,ln);
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (processCommandAndResetClient(c) == C_ERR) {
continue;
}
}
processInputBuffer(c);
}
return processed;
}
Copy the code
The clients in clients_pending_read are evenly distributed among several threads, and they also participate in reading data. Finally, they wait for all other threads to finish reading data
void processInputBuffer(client *c) {
/* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) {
// The client is suspended
if(! (c->flags & CLIENT_SLAVE) && clientsArePaused())break;
/ / client block
if (c->flags & CLIENT_BLOCKED) break;
PENDING_COMMAND is not handled
if (c->flags & CLIENT_PENDING_COMMAND) break;
/* Don't process input from the master while there is a busy script * condition on the slave. We want just to accumulate the replication * stream (instead of replying -BUSY like we do with other clients) and * later resume the processing. * /
if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
/* Determine request type when unknown. */
if(! c->reqtype) {if (c->querybuf[c->qb_pos] == The '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else{ c->reqtype = PROTO_REQ_INLINE; }}if (c->reqtype == PROTO_REQ_INLINE) {
if(processInlineBuffer(c) ! = C_OK)break;
/* If the Gopher mode and we got zero or one argument, process * the request in Gopher mode. */
if (server.gopher_enabled &&
((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] = ='/') ||
c->argc == 0))
{
processGopherRequest(c);
resetClient(c);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
break; }}else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if(processMultibulkBuffer(c) ! = C_OK)break;
} else {
serverPanic("Unknown request type");
}
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
// IO multithreaded execution of this will interrupt the execution of the command
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
// The core method of executing commands
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid exiting this * loop and trimming the client buffer later. So we return * ASAP in that case. */
return; }}}/ / interception buf
if (c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,- 1);
c->qb_pos = 0; }}Copy the code
ReadQueryFromClient reads commands into QueryBuf and calls processInputBuffer to handle queryBuf execution commands. If multithreaded reading is enabled, the current thread does not actually execute commands. Instead, set the CLIENT_PENDING_COMMAND flag to be handled on the main thread. Just came back to the last step, handleClientsWithPendingReadsUsingThreads call processCommandAndResetClient execute the command
int processCommandAndResetClient(client *c) {
int deadclient = 0;
server.current_client = c;
if (processCommand(c) == C_OK) {
commandProcessed(c);
}
if (server.current_client == NULL) deadclient = 1;
server.current_client = NULL;
/* freeMemoryIfNeeded may flush slave output buffers. This may * result into a slave, that may be the active client, to be * freed. */
return deadclient ? C_ERR : C_OK;
}
Copy the code
Call the processCommand method to enter the command execution process that was analyzed in the previous article
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
// Can read without multithreading
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
// Enable multithreading
if(! io_threads_active) startThreadedIO();if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
// Assign clients_pending_write clients to multiple threads
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
// The main thread also takes parameters
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
// Wait for all threads to finish their work
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");
// If there is still some left to send
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
/* Install the write handler if there are pending writes in some * of the clients. */
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
return processed;
}
Copy the code
Handle write events much the same as read events, except that stopThreadedIOIfNeeded is used to determine whether to turn off multithreading
int stopThreadedIOIfNeeded(void) {
int pending = listLength(server.clients_pending_write);
if (server.io_threads_num == 1) return 1;
// Stop multithreading if the data to be sent is less than twice the number of threads
if (pending < (server.io_threads_num*2)) {
if (io_threads_active) stopThreadedIO();
return 1;
} else {
return 0; }}void stopThreadedIO(void) {
// Call a read before stopping to complete the multithreaded read call
handleClientsWithPendingReadsUsingThreads();
if (tio_debug) { printf("E"); fflush(stdout); }
if (tio_debug) printf("--- STOPPING THREADED IO [R%d] [W%d] ---\n",
(int) listLength(server.clients_pending_read),
(int) listLength(server.clients_pending_write));
serverAssert(io_threads_active == 1);
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_lock(&io_threads_mutex[j]);
io_threads_active = 0;
}
Copy the code
The thread is blocked by pthread_mutex_lock
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
// The client is protected
if (c->flags & CLIENT_PROTECTED) continue;
/* Try to write buffers to the client socket. */
if (writeToClient(c,0) == C_ERR) continue;
// If there is still data after writing
if (clientHasPendingReplies(c)) {
int ae_barrier = 0;
// If aof is enabled and always is set to ae_barrier, write events should be executed first
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_barrier = 1;
}
if(connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) { freeClientAsync(c); }}}return processed;
}
Copy the code
The main thread returns the result of execution to the client, and the core calls the writeToClient method to send the content that needs to be returned in the BUF. The specific process is not broadcast
conclusion
Redis creates three fixed task threads and multiple IO threads to read and send client data. Before each event cycle, events in the previous event cycle will be processed, and the read task will be assigned to multithreading for execution. After reading, the main thread will execute database operations. This reduces the overhead of network IO, but threads also incur overhead and limited improvement