In serverCron, or in the redis startup process. You can see a lot of AOF logic involved. Now let’s sort out the relevant procedures of AOF.
The first thing to understand is that the writing of AOF is divided into 2 lines. One is to write and flush aOF data in the main thread, and the other is to generate a slimmed AOF file in the background process. The two lines can be carried out simultaneously or separately. When the background process has finished its task, it will use this AOF file to force the previous AOF file to be overwritten. The aOF data recovery process is not mentioned in the redis startup process, although it is not at the code level. There are two buffers for storing AOF data, one is aOF_buf, which is the ordinary AOF data storage buffer. Every time a command is executed, the propagate() method is triggered according to the command type, which is the entry to the whole AOF, The feedAppendOnlyFile method is indirectly triggered, whose main logic is to write data to aof_buf
/** * as the entry to the entire aOF write, the first thing you need to know is when the method is triggered. If the command should be appended to the AOF and the AOF is enabled *, data will be appended to the AOF * @param CMD command * @param dictid DB * @param redisObject of this operation * @param argc */ void feedAppendOnlyFile(struct redisCommand * CMD, int dictid, robj **argv, int argc) { sds buf = sdsempty(); robj *tmpargv[3]; /* The DB this command was targeting is not the same as the last command * we appended. To issue a SELECT command is Needed. * Create a selectCommand * */ if (dictid! = server.aof_selected_db) { char seldb[64]; snprintf(seldb,sizeof(seldb),"%d",dictid); buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", (unsigned long)strlen(seldb),seldb); server.aof_selected_db = dictid; } / / here according to the different data volume of the type of the command to generate the if (CMD - > proc = = expireCommand | | CMD - > proc = = pexpireCommand | | CMD - > proc = = expireatCommand) { /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */ buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); / / is a set order packaging after data is written to buf} else if (CMD - > proc = = setexCommand | | CMD - > proc = = psetexCommand) {/ * Translate SETEX/PSETEX to SET and PEXPIREAT */ tmpargv[0] = createStringObject("SET",3); tmpargv[1] = argv[1]; tmpargv[2] = argv[3]; buf = catAppendOnlyGenericCommand(buf,3,tmpargv); decrRefCount(tmpargv[0]); buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); } else if (cmd->proc == setCommand && argc > 3) { int i; robj *exarg = NULL, *pxarg = NULL; for (i = 3; i < argc; i ++) { if (! strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1]; if (! strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1]; } serverAssert(! (exarg && pxarg)); if (exarg || pxarg) { /* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */ buf = catAppendOnlyGenericCommand(buf,3,argv); if (exarg) buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1], exarg); if (pxarg) buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1], pxarg); } else { buf = catAppendOnlyGenericCommand(buf,argc,argv); } } else { /* All the other commands don't need translation or need the * same translation already operated in the command vector * for the replication itself. */ buf = catAppendOnlyGenericCommand(buf,argc,argv); } /* Append to the AOF buffer. This will be flushed on disk just before * of re-entering the event loop, So before the client will get a * positive reply about the operation performed. * Append this data to aOF_buf The final write to aof_file is via the * */ if (server.aof_state == AOF_ON) server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf)); /* If a background append only file rewriting is in progress we want to * accumulate the differences between the child DB and the current one * in a buffer, so that when the child process will do its work we * can append the differences to the new append only file. * * */ if (server.aof_child_pid!) ¶ If (server.aof_child_pid! = -1) aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf)); sdsfree(buf); }Copy the code
At the end, we saw that if there was an AOF child process, it would execute the aofRewriteBufferAppend method. What does that method do?
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. * Allocating new blocks if needed. If a new command is executed, the new aOF data will be written to aof_rewrite_buf_blocks * this pipe will be used to ensure secure communication between processes * */ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { listNode *ln = listLast(server.aof_rewrite_buf_blocks); aofrwblock *block = ln ? ln->value : NULL; while(len) { /* If we already got at least an allocated block, Try appending * at least some piece into it. * * If (block) {unsigned long thislen = (block->free < len) ? block->free : len; if (thislen) { /* The current block is not already full. */ memcpy(block->buf+block->used, s, thislen); block->used += thislen; block->free -= thislen; s += thislen; len -= thislen; If (len) {/* First block to allocate, or need another block. */ int numblocks; block = zmalloc(sizeof(*block)); block->free = AOF_RW_BUF_BLOCK_SIZE; block->used = 0; listAddNodeTail(server.aof_rewrite_buf_blocks,block); /* Log every time we cross more 10 or 100 blocks, respectively * as a notice or warning. */ numblocks = listLength(server.aof_rewrite_buf_blocks); if (((numblocks+1) % 10) == 0) { int level = ((numblocks+1) % 100) == 0 ? LL_WARNING : LL_NOTICE; serverLog(level,"Background AOF buffer size: %lu MB", aofRewriteBufferSize()/(1024*1024)); /* Install a file event to send data to the rewrite child if there is * not one already Register write events for the write channel. After writing data to the kernel shared buffer through the write channel, the child process can read data from the kernel shared buffer (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) { aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); }}Copy the code
Write data to a special buffer called aof_rewrite_buf_blocks. Ignore the child’s AOF writes and see how the main process handles the data in aOF_buf. You’ll see a line like this in beforeSleep
/* Send the invalidation messages to clients participating to the * client side caching protocol in broadcasting (BCAST) Mode. * TODO ignore link related * * / trackingBroadcastInvalidationMessages (); FlushAppendOnlyFile (0); /* flushAppendOnlyFile(0); /* Handle index with pending output buffers. * */ handleClientsWithPendingWritesUsingThreads(); / * Close clients that need to be closed in the asynchronous * processing to_close queue client * * / freeClientsInAsyncFreeQueue ();Copy the code
And serverCron
/* AOF write errors: in this case we have a buffer to flush as well and * clear the AOF error in case of success to make the DB writable again, * However to try every second is enough in case of 'Hz' is set to * a higher frequency * */ run_with_period(1000) {if (server.aof_last_write_status == C_ERR) flushAppendOnlyFile(0); }Copy the code
and
/* AOF postponed flush: Try at every cron cycle if the slow fsync * completed. * The * */ if command is executed when an aOF file has been flushed (server.aof_flush_postponed_start) flushAppendOnlyFile(0);Copy the code
Let’s see how this method handles the data in aOF_buf
/**
* 在服务器即将被关闭 或者beforeSleep中 会执行该方法 如果上次刷盘失败也会尝试刷盘 当服务器即将被关闭时是强制刷盘 其他情况非强制
* 从这里可以看出redis单节点是存在数据丢失的情况的 如何通过副本解决这个问题是之后要关注的
* @param force
*/
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
mstime_t latency;
// 每当执行完command后 就会将数据写入到aof_buf中 然后在合适的时机 在主线程完成aof文件的刷盘 还有一种是通过子进程生成aof文件
// 虽然本次没有新的数据产生 但是并不是每次执行该方法都会刷盘 这里就是检测是否有必要刷盘
if (sdslen(server.aof_buf) == 0) {
/* Check if we need to do fsync even the aof buffer is empty,
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is
* called only when aof buffer is not empty, so if users
* stop write commands before fsync called in one second,
* the data in page cache cannot be flushed in time.
*
* 确保此时aof文件中的数据偏移量比上次刷盘记录的偏移量大
* 并且当前时间应当大于上一次刷盘时间 实际上就是相差1秒及以上(避免刷盘过于频繁)
* 确保此时还没有设置刷盘任务
* */
if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.aof_fsync_offset != server.aof_current_size &&
server.unixtime > server.aof_last_fsync &&
!(sync_in_progress = aofFsyncInProgress())) {
goto try_fsync;
} else {
// 此时没有待刷盘数据 直接返回
return;
}
}
// AOF_FSYNC_EVERYSEC 是异步刷盘 这里是判断此时是否已经有待执行的刷盘任务了
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = aofFsyncInProgress();
// 非强制刷盘模式
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds.
* 此时已经存在一个刷盘任务了 注意这个刷盘任务并不一定是针对被文件
* */
if (sync_in_progress) {
if (server.aof_flush_postponed_start == 0) {
/* No previous write postponing, remember that we are
* postponing the flush and return.
* 记录当前时间点
* */
server.aof_flush_postponed_start = server.unixtime;
return
// 代表在2秒内该方法在非强制情况下连续执行 是没有任何效果的
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
return;
}
/* Otherwise fall trough, and go write since we can't wait
* over two seconds.
* 代表已经等待超过2秒了 这里决定在本线程手动刷盘
* */
server.aof_delayed_fsync++;
serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don't think
* there is much to do about the whole server stopping for power problems
* or alike */
// 如果在刷盘前需要等待一定时间 调用sleep
if (server.aof_flush_sleep && sdslen(server.aof_buf)) {
usleep(server.aof_flush_sleep);
}
latencyStartMonitor(latency);
// 将buf中的数据写入到文件中
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
latencyEndMonitor(latency);
/* We want to capture different events for delayed writes:
* when the delay happens with a pending fsync, or with a saving child
* active, and when the above two conditions are missing.
* We also use an additional event name to save all samples which is
* useful for graphing / monitoring purposes. */
if (sync_in_progress) {
latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
} else if (hasActiveChildProcess()) {
latencyAddSampleIfNeeded("aof-write-active-child",latency);
} else {
latencyAddSampleIfNeeded("aof-write-alone",latency);
}
latencyAddSampleIfNeeded("aof-write",latency);
/* We performed the write so reset the postponed flush sentinel to zero.
* 因为数据已经写入到文件中了 就不需要在serverCron中延迟调用该方法了
* */
server.aof_flush_postponed_start = 0;
// 发现部分数据写入失败
if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
static time_t last_write_error_log = 0;
int can_log = 0;
/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
can_log = 1;
last_write_error_log = server.unixtime;
}
/* Log the AOF write error and record the error code. */
if (nwritten == -1) {
if (can_log) {
serverLog(LL_WARNING,"Error writing to the AOF file: %s",
strerror(errno));
server.aof_last_write_errno = errno;
}
} else {
if (can_log) {
serverLog(LL_WARNING,"Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
}
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
if (can_log) {
serverLog(LL_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
} else {
/* If the ftruncate() succeeded we can set nwritten to
* -1 since there is no longer partial data into the AOF. */
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}
/* Handle the AOF write error. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* We can't recover when the fsync policy is ALWAYS since the
* reply for the client is already in the output buffers, and we
* have the contract with the user that on acknowledged write data
* is synced on disk. */
serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else {
// 本次部分写入失败 等待下次写入即可
/* Recover from failed write leaving data into the buffer. However
* set an error to stop accepting writes as long as the error
* condition is not cleared. */
server.aof_last_write_status = C_ERR;
/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */
if (nwritten > 0) {
server.aof_current_size += nwritten;
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
}
} else {
/* Successful write(2). If AOF was in error state, restore the
* OK state and log the event.
* 本次buf中的所有数据都已经写入到aof中 取消标记 避免之后的重试
* */
if (server.aof_last_write_status == C_ERR) {
serverLog(LL_WARNING,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = C_OK;
}
}
server.aof_current_size += nwritten;
/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary). */
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
// 代表需要对aof刷盘
try_fsync:
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background.
* aof_no_fsync_on_rewrite 这个配置项默认是0 意思是如果此时aof子进程正在运行 就没必要对此时的aof文件进行刷盘了 之后数据会立即被替换
* */
if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())
return;
/* Perform the fsync if needed.
* 代表每次都会发起刷盘
* */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* redis_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
latencyStartMonitor(latency);
redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_fsync_offset = server.aof_current_size;
server.aof_last_fsync = server.unixtime;
// 通过bio线程完成刷盘任务
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) {
aof_background_fsync(server.aof_fd);
server.aof_fsync_offset = server.aof_current_size;
}
server.aof_last_fsync = server.unixtime;
}
}
Copy the code
In the main thread, the aOF_buf data is written to the AOF file and an attempt is made to flush. This is the main thread that handles aof. In some cases it triggers the backend child process to generate aOF files. Currently, there are two trigger points. The first is that the AOF file detected in serverCron is large enough to be thinned. the other is that the task is forcibly triggered by an external command.
serverCron
/* Trigger an AOF. Rewrite if needed. * */ if (server.aof_state == AOF_ON &&! HasActiveChildProcess () &&server.aof_rewrite_perc && // If there is very little data in the AOF file, rewrite(thin) will be bad so there is a minimum limit Server.aof_current_size > server.aof_rewrite_min_size) {// Check whether the conditions are met long Long base = server.aof_rewrite_base_size? server.aof_rewrite_base_size : 1; long long growth = (server.aof_current_size*100/base) - 100; if (growth >= server.aof_rewrite_perc) { serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); rewriteAppendOnlyFileBackground(); }}Copy the code
External command
* @param c */ void bgrewriteaofCommand(client *c) {// There is already a working child if (server.aof_child_pid ! = -1) { addReplyError(c,"Background append only file rewriting already in progress"); } else if (hasActiveChildProcess()) {server.aof_rewrite_scheduled = 1; addReplyStatus(c,"Background append only file rewriting scheduled"); / / create the child to perform a task The main thread will be returned directly here} else if (rewriteAppendOnlyFileBackground () = = C_OK) {addReplyStatus (c, "Background append only file rewriting started"); } else { addReplyError(c,"Can't execute an AOF background rewriting. " "Please check the server logs for more information."); }}Copy the code
The child process’s processing logic looks like this
/* Write a sequence of commands able to fully rebuild the dataset into * "filename". Used both by REWRITEAOF and BGREWRITEAOF. * * In order to minimize the number of commands needed in the rewritten * log Redis uses variadic commands when possible, such as RPUSH, SADD * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time * are inserted using a single command. * * */ int rewriteAppendOnlyFile(char *filename) {Rio aof; FILE *fp = NULL; char tmpfile[256]; char byte; /* Note that we have to use a different temp name here compared to the * one used by rewriteAppendOnlyFileBackground() * */ snprintf(tmpFile,256,"temp-rewriteaof-% d.of ", (int) getpid()); fp = fopen(tmpfile,"w"); if (! fp) { serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno)); return C_ERR; } server.aof_child_diff = sdsempty(); // Wrap a temporary file as a Rio stream and write data to rioInitWithFile(& aOF,fp) via the Rio API; If (server.aof_rewrite_incremental_fsync) rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES); // Notify module listeners of the event startSaving(RDBFLAGS_AOF_PREAMBLE); If (server.aof_use_rdb_preamble) {int error; if (server.aof_use_rdb_preamble) {int error; When generating RDB data as AOF, rDBFlags is RDBFLAGS_AOF_PREAMBLE, so that RDBFLAGS_AOF_PREAMBLE can be read from the parent process while generating RDB data (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) { errno = error; goto werr; }} else {// Generate the simplest command directly based on the current DB data equivalent to a slimmdown aof if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr; } /* Do an initial slow fsync here while the parent is still sending * data, If (fflush(fp) == EOF) goto werr; if (fflush(fp) == EOF) goto werr; if (fsync(fileno(fp)) == -1) goto werr; /* Read again a few times to get more data from the parent. * We can't read forever (the server may receive data from clients * faster than it is able to send data to the child), so we try to read * some more data in a loop as soon as there is a good chance more data * will come. If it looks like we are wasting time, Int nodata = 0; we abort (this * happens after 20 ms without new data). mstime_t start = mstime(); / / not in 20 milliseconds read to the end of the new data or processing time more than 1000 milliseconds processing while (mstime () - start < 1000 && nodata < 20) {if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0) { nodata++; continue; } nodata = 0; /* Start counting from zero, we stop on N *contiguous* timeouts. */ aofReadDiffFromParent(); } /* if (write(server.aof_pipe_write_ACK_to_parent,"!" 1)! = 1) goto werr; if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) ! = ANET_OK) goto werr; /* We read the ACK from the server using a 10 seconds timeout. Normally * it should reply ASAP, but just in case we lose its reply, We are sure * the child will eventually get terminated. * block wait for the parent process to actually receive information * */ if (syncRead (server. Aof_pipe_read_ack_from_parent, & byte, 1500). = 1 || byte ! = '! ') goto werr; serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF..." ); /* Read the final diff if any. * There is a time difference between the child sending the terminating signal and the parent receiving the terminating signal If you just use the main thread to generate aOF data, you can make sure that all commands are written to the AOF * */ aofReadDiffFromParent(); /* Write the received diff to the file. */ serverLog(LL_NOTICE, "Concatenating %.2f MB of AOF diff received from parent.", (double) sdslen(server.aof_child_diff) / (1024*1024)); // Write all commands captured from the parent process to the aOF file to ensure that the aOF data is minimal (slimming command + new commands generated during this period) if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0) goto werr; /* Make sure data will not remain on the OS's output buffers * * If (fflush(fp)) goto werr; if (fsync(fileno(fp))) goto werr; if (fclose(fp)) { fp = NULL; goto werr; } fp = NULL; /* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ if (rename(tmpfile,filename) == -1) { serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); unlink(tmpfile); stopSaving(0); return C_ERR; } serverLog(LL_NOTICE,"SYNC append only file rewrite performed"); // Issue an event to the Module listener stopSaving(1); return C_OK; werr: serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); if (fp) fclose(fp); unlink(tmpfile); stopSaving(0); return C_ERR; }Copy the code
The main thread may process new commands while the child processes aOF data. These commands are written to aOF_buf as well as aOF_rewrite_buf_blocks. When aOF’s thin command is generated, Aof_rewrite_buf_blocks’ data is read and appended to the AOF file. When the terminating symbol “!” is sent to the main process After that, the parent process will not continue to transfer data, if the downtime, data will be lost. We’re thinking a little bit. When the slimming is complete, the data stored in aOF_BUf is not necessary (assuming it has not been written to the AOF file), it is not necessary. Because the slimmed AOF file has a higher priority, the previous data must be discarded. This is part of the logic in backgroundRewriteDoneHandler, trigger point of the method in serverCron. CheckChildrenDone
Void checkChildrenDone(void) {int statloc; void checkChildrenDone(void); pid_t pid; /* If we have a diskless rdb child (note that we support only one concurrent * child), we want to avoid collecting it's exit status and acting on it * as long as we didn't finish to drain the pipe, since then we're at risk * of starting a new fork and a new pipe before we're done with the previous * one. * * */ if (server.rdb_child_pid! = -1 && server.rdb_pipe_conns) return; // Wait for the child process to be completely shut down // exit just turns the child into a zombie process (maybe auto-clean? WNOHANG stands for return immediately when there are no child processes, If (pid = wait3(&statloc,WNOHANG, null)) if (pid = wait3(&statloc,WNOHANG, null))! Int exitCode = WEXITSTATUS(statloc); int bysignal = 0; Wait signaled (Statloc) from their wait and become signaled (Statloc). /* sigKillChildHandler catches the signal and calls exit(), but we * must make sure not to flag lastbgsave_status, etc incorrectly. * We could directly terminate the child process via SIGUSR1 * without handling it, But in this case Valgrind will log an * annoying error. * Indicates that this sub-process is being terminated * */ if (exitcode ==) SERVER_CHILD_NOERROR_RETVAL) { bysignal = SIGUSR1; exitcode = 1; If (pid == -1) {serverLog(LL_WARNING,"wait3() returned an error: %s. " "rdb_child_pid = %d, aof_child_pid = %d, module_child_pid = %d", strerror(errno), (int) server.rdb_child_pid, (int) server.aof_child_pid, (int) server.module_child_pid); Else if (pid == server.rdb_child_pid) {// When the child process completes its task, some logic will be triggered backgroundSaveDoneHandler(exitcode,bysignal); If (! bysignal && exitcode == 0) receiveChildInfo(); / / close aof the child to do some cleaning resources} else if (pid = = server. Aof_child_pid) {backgroundRewriteDoneHandler (exitcode bysignal); if (! bysignal && exitcode == 0) receiveChildInfo(); } else if (pid == server.module_child_pid) { ModuleForkDoneHandler(exitcode,bysignal); if (! bysignal && exitcode == 0) receiveChildInfo(); } else {TODO LDB is used to parse lua scripts. If (! ldbRemoveChild(pid)) { serverLog(LL_WARNING, "Warning, detected child with unmatched pid: %ld", (long)pid); }} // Dict can be resized because the child process is reclaimed; CloseChildInfoPipe (); }}Copy the code
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
* Handle this.
* 在serverCron中 会检测子进程是否完成了任务 当子进程在执行aof相关的任务时 完成后就是执行该方法
* */
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
// 进程正常退出
if (!bysignal && exitcode == 0) {
int newfd, oldfd;
char tmpfile[256];
long long now = ustime();
mstime_t latency;
serverLog(LL_NOTICE,
"Background AOF rewrite terminated with success");
/* Flush the differences accumulated by the parent to the
* rewritten AOF. */
latencyStartMonitor(latency);
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
(int)server.aof_child_pid);
newfd = open(tmpfile,O_WRONLY|O_APPEND);
if (newfd == -1) {
serverLog(LL_WARNING,
"Unable to open the temporary AOF produced by the child: %s", strerror(errno));
goto cleanup;
}
// 因为子进程已经完成了aof的写入 主线程中 aof_buf中存储的数据就可以丢弃了 只要保存父子进程间未同步的数据就可以
if (aofRewriteBufferWrite(newfd) == -1) {
serverLog(LL_WARNING,
"Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
close(newfd);
goto cleanup;
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);
serverLog(LL_NOTICE,
"Residual parent diff successfully flushed to the rewritten AOF (%.2f MB)", (double) aofRewriteBufferSize() / (1024*1024));
/* The only remaining thing to do is to rename the temporary file to
* the configured file and switch the file descriptor used to do AOF
* writes. We don't want close(2) or rename(2) calls to block the
* server on old file deletion.
*
* There are two possible scenarios:
*
* 1) AOF is DISABLED and this was a one time rewrite. The temporary
* file will be renamed to the configured file. When this file already
* exists, it will be unlinked, which may block the server.
*
* 2) AOF is ENABLED and the rewritten AOF will immediately start
* receiving writes. After the temporary file is renamed to the
* configured file, the original AOF file descriptor will be closed.
* Since this will be the last reference to that file, closing it
* causes the underlying file to be unlinked, which may block the
* server.
*
* To mitigate the blocking effect of the unlink operation (either
* caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
* use a background thread to take care of this. First, we
* make scenario 1 identical to scenario 2 by opening the target file
* when it exists. The unlink operation after the rename(2) will then
* be executed upon calling close(2) for its descriptor. Everything to
* guarantee atomicity for this switch has already happened by then, so
* we don't care what the outcome or duration of that close operation
* is, as long as the file descriptor is released again. */
if (server.aof_fd == -1) {
/* AOF disabled */
/* Don't care if this fails: oldfd will be -1 and we handle that.
* One notable case of -1 return is if the old file does
* not exist. */
oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);
} else {
/* AOF enabled */
oldfd = -1; /* We'll set this to the current AOF filedes later. */
}
/* Rename the temporary file. This will not unlink the target file if
* it exists, because we reference it with "oldfd". */
latencyStartMonitor(latency);
// 使用这个存储了 rewrite数据的文件作为新的aof文件 并丢弃之前的aof文件 因为这个方法是主线程在执行serverCron中调用的 与aof的写入互斥 所以不需要做并发控制
if (rename(tmpfile,server.aof_filename) == -1) {
serverLog(LL_WARNING,
"Error trying to rename the temporary AOF file %s into %s: %s",
tmpfile,
server.aof_filename,
strerror(errno));
close(newfd);
if (oldfd != -1) close(oldfd);
goto cleanup;
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-rename",latency);
// 如果之前没有开启aof文件 那么在更新了文件的数据后 关闭文件
if (server.aof_fd == -1) {
/* AOF disabled, we don't need to set the AOF file descriptor
* to this new file, so we can close it. */
close(newfd);
} else {
/* AOF enabled, replace the old fd with the new one.
* 更新此时的aof文件句柄
* */
oldfd = server.aof_fd;
server.aof_fd = newfd;
// 根据刷盘类型 执行刷盘任务
if (server.aof_fsync == AOF_FSYNC_ALWAYS)
redis_fsync(newfd);
else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
aof_background_fsync(newfd);
server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size;
server.aof_fsync_offset = server.aof_current_size;
/* Clear regular AOF buffer since its contents was just written to
* the new AOF from the background rewrite buffer. */
sdsfree(server.aof_buf);
// 这样就可以清空aof_buf的数据了
server.aof_buf = sdsempty();
}
// 代表最近一次子进程的 aof文件生成成功
server.aof_lastbgrewrite_status = C_OK;
serverLog(LL_NOTICE, "Background AOF rewrite finished successfully");
/* Change state from WAIT_REWRITE to ON if needed
* 代表整个 rewrite完成
* */
if (server.aof_state == AOF_WAIT_REWRITE)
server.aof_state = AOF_ON;
/* Asynchronously close the overwritten AOF.
* 旧文件会由后台任务关闭
* */
if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
serverLog(LL_VERBOSE,
"Background AOF rewrite signal handler took %lldus", ustime()-now);
// 非正常退出
} else if (!bysignal && exitcode != 0) {
server.aof_lastbgrewrite_status = C_ERR;
serverLog(LL_WARNING,
"Background AOF rewrite terminated with error");
} else {
/* SIGUSR1 is whitelisted, so we have a way to kill a child without
* triggering an error condition. */
if (bysignal != SIGUSR1)
server.aof_lastbgrewrite_status = C_ERR;
serverLog(LL_WARNING,
"Background AOF rewrite terminated by signal %d", bysignal);
}
cleanup:
aofClosePipes();
aofRewriteBufferReset();
aofRemoveTempFile(server.aof_child_pid);
server.aof_child_pid = -1;
server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
server.aof_rewrite_time_start = -1;
/* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
if (server.aof_state == AOF_WAIT_REWRITE)
server.aof_rewrite_scheduled = 1;
}
Copy the code
Here, the original AOF file is replaced with a slimmed aOF temporary file for the child process.
Above, the analysis of AOF has been completed.