Before talking about clusters/replicas/sentinels, explain the single-node functionality as much as possible. We have already talked about the redis startup process, the event loop, aOF/RDB, this time we will talk about how Redis executes the client command.
In the event loop, the data stream that reads the client is used to execute the command, so the entry is there. Call link is such readQueryFromClient – > processInputBuffer – > processCommandAndResetClient – > processCommand
Take a look at what processCommand does
int processCommand(client *c) {
// 在执行命令前一般都要通过一组过滤器 比如是否接受这样的命令等等 没有默认过滤器
moduleCallCommandFilters(c);
/* The QUIT command is handled separately. Normal command procs will
* go through checking for replication and QUIT will cause trouble
* when FORCE_REPLICATION is enabled and would be implemented in
* a regular command proc.
* 代表当前client会与redis断开连接 在network模块会检查该标记并断开连接
* */
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth.
* 去命令池中寻找匹配的命令
* */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
// command 未找到时 返回异常信息给client
if (!c->cmd) {
sds args = sdsempty();
int i;
for (i=1; i < c->argc && sdslen(args) < 128; i++)
args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
rejectCommandFormat(c,"unknown command `%s`, with args beginning with: %s",
(char*)c->argv[0]->ptr, args);
sdsfree(args);
return C_OK;
// 如果是参数数量不匹配的情况 将参数错误信息返回给client
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
rejectCommandFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return C_OK;
}
// 获取本次执行的command类型标识 如果是批任务,并且本次执行的是exec并且这组批任务中至少有一个是write
int is_write_command = (c->cmd->flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
// 代表当前command不应该在oom的情况下继续执行
int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_DENYOOM));
// 该command是否允许在游离的节点上执行
int is_denystale_command = !(c->cmd->flags & CMD_STALE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE));
// 当前节点数据加载还未完成 是否允许执行
int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING));
/* Check if the user is authenticated. This check is skipped in case
* the default user is flagged as "nopass" and is active.
* 判断当前用户是否已验权
* USER_FLAG_NOPASS 代表不需要密码 也就是不用验权
* USER_FLAG_DISABLED 代表当前用户不可用
* !c->authenticated 代表默认用户还未验权
* */
int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
(DefaultUser->flags & USER_FLAG_DISABLED)) &&
!c->authenticated;
// 代表此时还未进行验权
if (auth_required) {
/* AUTH and HELLO and no auth modules are valid even in
* non-authenticated state.
* 如果要执行的command本身并不需要验权 就不会拦截 比如验权命令本身 或者是hello命令
* */
if (!(c->cmd->flags & CMD_NO_AUTH)) {
rejectCommand(c,shared.noautherr);
return C_OK;
}
}
/* Check if the user can run this command according to the current
* ACLs. */
int acl_keypos;
// 当用户认证成功后 会更改client->user 此时对用户权限进行校验,因为当前用户不一定有权限执行这个command
int acl_retval = ACLCheckCommandPerm(c,&acl_keypos); // 1
// 权限校验失败认为是client参数错误 本次processCommand还是成功的
if (acl_retval != ACL_OK) {
addACLLogEntry(c,acl_retval,acl_keypos,NULL);
if (acl_retval == ACL_DENIED_CMD)
rejectCommandFormat(c,
"-NOPERM this user has no permissions to run "
"the '%s' command or its subcommand", c->cmd->name);
else
rejectCommandFormat(c,
"-NOPERM this user has no permissions to access "
"one of the keys used as arguments");
return C_OK;
}
/* If cluster is enabled perform the cluster redirection here.
* However we don't perform the redirection if:
* 1) The sender of this command is our master.
* 2) The command has no key arguments.
* TODO 这里涉及到请求在集群内的转发 有关集群的逻辑之后看
* */
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&
server.lua_caller->flags & CLIENT_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
c->cmd->proc != execCommand))
{
int hashslot;
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
&hashslot,&error_code);
if (n == NULL || n != server.cluster->myself) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
}
clusterRedirectClient(c,n,hashslot,error_code);
return C_OK;
}
}
/* Handle the maxmemory directive.
*
* Note that we do not want to reclaim memory if we are here re-entering
* the event loop since there is a busy Lua script running in timeout
* condition, to avoid mixing the propagation of scripts with the
* propagation of DELs due to eviction.
* 代表redis服务器有设置内存使用上限
* */
if (server.maxmemory && !server.lua_timedout) {
// 这里判断此时是否有足够的内存 释放内存的方式就会涉及到内存淘汰策略 某些命令不支持在oom的情况下继续执行
int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR; // 2
/* freeMemoryIfNeeded may flush slave output buffers. This may result
* into a slave, that may be the active client, to be freed.
* 在freeMemoryIfNeeded中 会将slave缓冲区中的数据尽可能的发送出去,是为了尽快的同步delCommand 避免slave节点内存不足,在这个过程中可能会间接导致与该client的连接断开(比如发生了什么错误)
* 在调用unlinkClient后就会将current_client置空 此时也就没有继续执行command的必要了
* */
if (server.current_client == NULL) return C_ERR;
// 判断当前command是否不允许在oom的情况下执行 比如get操作不会申请新内存 还是可以正常执行的
int reject_cmd_on_oom = is_denyoom_command;
/* If client is in MULTI/EXEC context, queuing may consume an unlimited
* amount of memory, so we want to stop that.
* However, we never want to reject DISCARD, or even EXEC (unless it
* contains denied commands, in which case is_denyoom_command is already
* set.
* 如果本次正在执行一组command 并且其中没有表示完结的exec/discard 那么之后可能会使用大量内存 以防万一就拒绝本次任务
* */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand &&
c->cmd->proc != discardCommand) {
reject_cmd_on_oom = 1;
}
// 此时内存不足 且本次command无法在oom的情况下继续执行 将错误信息返回给client
if (out_of_memory && reject_cmd_on_oom) {
rejectCommand(c, shared.oomerr);
return C_OK;
}
}
/* Don't accept write commands if there are problems persisting on disk
* and if this is a master instance.
* 检查最近的一次aof/rdb写入是否正常
* */
int deny_write_type = writeCommandsDeniedByDiskError();
// 此时aof/rdb无法正常工作 顺便提一下aof默认是关闭的,redis可以仅依靠满足saveparam触发rdb的生成来实现可用性(但不是高可用,数据可能会丢失)
// 并且本节点是master节点 也就意味着无法通过与master节点的数据同步实现数据恢复 并且本次是一次写入命令 或者是一次探测命令(pingCommand同时兼具了检测对端节点能否正常工作的任务)
if (deny_write_type != DISK_ERROR_TYPE_NONE &&
server.masterhost == NULL &&
(is_write_command ||c->cmd->proc == pingCommand))
{
if (deny_write_type == DISK_ERROR_TYPE_RDB)
rejectCommand(c, shared.bgsaveerr);
else
rejectCommandFormat(c,
"-MISCONF Errors writing to the AOF file: %s",
strerror(server.aof_last_write_errno));
return C_OK;
}
/* Don't accept write commands if there are not enough good slaves and
* user configured the min-slaves-to-write option.
* 当前可以正常工作的slave节点数量低于要求的最小值, 无法执行命令 对最小可用slave数量有所限制是为了保障高可用性
* TODO 有关集群/副本/分片的逻辑之后梳理
* */
if (server.masterhost == NULL &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
{
rejectCommand(c, shared.noreplicaserr);
return C_OK;
}
/* Don't accept write commands if this is a read only slave. But
* accept write commands if this is our master.
* repl_slave_ro 代表当前slave节点是一个只读节点,无法执行写入命令
* */
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
is_write_command)
{
rejectCommand(c, shared.roslaveerr);
return C_OK;
}
/* Only allow a subset of commands in the context of Pub/Sub if the
* connection is in RESP2 mode. With RESP3 there are no limits.
* 如果该client是作为本节点的订阅者,只能执行以下命令
* */
if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
rejectCommandFormat(c,
"Can't execute '%s': only (P)SUBSCRIBE / "
"(P)UNSUBSCRIBE / PING / QUIT are allowed in this context",
c->cmd->name);
return C_OK;
}
/* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
* when slave-serve-stale-data is no and we are a slave with a broken
* link with master.
* 当前slave节点处于游离状态时 是否允许执行命令 (游离状态就代表此时本节点的数据不是最新的 因为缺少了同步数据的目标节点)
* */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
// repl_serve_stale_data 默认为1,也就是允许在游离节点上执行命令 在生产环境下为保证一致性应该将该配置设置成0
server.repl_serve_stale_data == 0 &&
is_denystale_command)
{
rejectCommand(c, shared.masterdownerr);
return C_OK;
}
/* Loading DB? Return an error if the command has not the
* CMD_LOADING flag.
* 此时还处于数据加载阶段 某些command就会被拒绝
* */
if (server.loading && is_denyloading_command) {
rejectCommand(c, shared.loadingerr);
return C_OK;
}
/* Exec the command
* 在通过参数校验 command执行权限校验 内存校验 节点状态校验后 终于开始执行command
* 如果发现本次执行的是一个批任务 并且本次不是表示终止的任务 将command加入到队列中
* */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
// 这里传入的CMD_CALL_FULL包含了CMD_CALL_PROPAGATE_REPL 而CMD_CALL_PROPAGATE_REPL等同于CMD_CALL_PROPAGATE_AOF|CMD_CALL_PROPAGATE_REPL
// 也就是默认情况下需要将任务同步到aof和slave上
call(c,CMD_CALL_FULL); // 3
// TODO 副本相关
c->woff = server.master_repl_offset;
// 在执行完某些任务后 可能某些key会被加入到ready_keys (在执行dbAdd时就会将key设置到ready_keys中)
// 此时就可以唤醒阻塞等待这些key的client 什么情况会产生被阻塞的client?
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys(); // 4
}
return C_OK;
}
Copy the code
The Lua script and trace code have been omitted. There are four points of access. Check the memory. 3. Execute the command. 4.
Verify the user name and password before running the redis command for the first time. Client ->user is then set. The verified command is authCommand. Let’s not delve into when the order was given. Let’s see what the checksum does.
Int ACLCheckCommandPerm(client *c, int *keyidxptr) {// The user bound to this client is user *u = c->user; Uint64_t id = c-> CMD ->id; /* If there is no associated user, If (u == NULL) return ACL_OK; if (u == NULL) return ACL_OK; /* Check if the user can execute this command. * * Check if the user can execute this command. (u->flags & USER_FLAG_ALLCOMMANDS) && c->cmd->proc ! = authCommand) { /* If the bit is not set we have to check further, In case the * command is allowed just with that specific subcommand. * Each user has its own permission bitmap What permissions each user has are set in the configuration file that is read when Redis is started. * * Check if the subcommand matches. * if (aclGetUserBit (u,id) == 0) {/* Check if the subcommand matches. * Here is the command without matching on attempts to match the subcommand * when there are fewer parameters Or the user is not set subCommand is return error information * * / if (c - > arg c < 2 | | u - > allowed_subcommands = = NULL | | u - > allowed_subcommands [id] = = NULL) {/ / child matching failure return permission to check the return command ACL_DENIED_CMD; } long subid = 0; while (1) { if (u->allowed_subcommands[id][subid] == NULL) return ACL_DENIED_CMD; // The subcommand is still allowed if (! strcasecmp(c->argv[1]->ptr, u->allowed_subcommands[id][subid])) break; /* Subcommand match found. Stop here. */ subid++; } } } /* Check if the user can execute commands explicitly touching the keys * mentioned in the command arguments. * * */ if (! (c - > user - > flags & USER_FLAG_ALLKEYS) && / / auxiliary calibration of the key is the following two parameters (c - > CMD - > getkeys_proc | | c - > CMD - > firstkey)) {/ / GetKeyResult getKeysResult result = GETKEYS_RESULT_INIT; Int numkeys = getKeysFromCommand(c-> CMD,c->argv,c->argc,&result); int numkeys = getKeysFromCommand(c-> CMD,c->argc,&result); // keyidx = result.keys; for (int j = 0; j < numkeys; j++) { listIter li; listNode *ln; listRewind(u->patterns,&li); /* Test this key against every pattern * */ int match = 0; while((ln = listNext(&li))) { sds pattern = listNodeValue(ln); size_t plen = sdslen(pattern); Int idx = keyIDx [j]; if (stringmatchlen(pattern,plen,c->argv[idx]->ptr, sdslen(c->argv[idx]->ptr),0)) { match = 1; break; }} // If (! match) { if (keyidxptr) *keyidxptr = keyidx[j]; getKeysFreeResult(&result); return ACL_DENIED_KEY; } } getKeysFreeResult(&result); } /* If we survived all the above checks, the user can execute the * command. */ return ACL_OK; }Copy the code
GetKeysFromCommand () expands to obtain the parameter information of the command executed this time. The permission authentication mainly checks whether the current user has the permission to execute the command and subCommand, and whether the parameters used can pass the pattern.
For non-read-only operations, you need to check if there is enough memory, and when there is insufficient memory, redis’s memory flushing policy is initiated.
int freeMemoryIfNeeded(void) {
int keys_freed = 0;
/* By default replicas should ignore maxmemory
* and just be masters exact copies.
* 当前节点是副本 且设置了不需要考虑slave的内存情况 也就是内存控制任务完全放在master节点 其他节点只要确保同步正常就可以确保内存在合理范围之内
* */
if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;
size_t mem_reported, mem_tofree, mem_freed;
mstime_t latency, eviction_latency, lazyfree_latency;
long long delta;
int slaves = listLength(server.slaves);
int result = C_ERR;
/* When clients are paused the dataset should be static not just from the
* POV of clients not being able to write, but also from the POV of
* expires and evictions of keys not being performed.
* 如果此时client都被暂停 此时不会发生新的写入操作 就认为内存是够用的
* TODO 发现在集群模块中会将某个client暂停 先搁置
* */
if (clientsArePaused()) return C_OK;
// 如果此时mem_reported < server.maxMem 就代表有足够的空间
// mem_reported 是此时检测已经使用的内存
// mem_tofree 是要释放的内存
if (getMaxmemoryState(&mem_reported, NULL, &mem_tofree, NULL) == C_OK)
return C_OK;
// 记录已经释放的内存空间
mem_freed = 0;
// 此时已经确定要释放内存了
latencyStartMonitor(latency);
// 这里就涉及到内存淘汰策略了 如果执行了不允许进行内存淘汰 跳转到cant_free
if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
goto cant_free; /* We need to free memory, but policy forbids. */
while (mem_freed < mem_tofree) {
int j, k, i;
// 这里要挨个遍历所有db 并尝试内存清理
static unsigned int next_db = 0;
sds bestkey = NULL;
int bestdbid;
redisDb *db;
dict *dict;
dictEntry *de;
// 当采用lru/lfu 或者 volatile_ttl的内存淘汰策略时
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU | MAXMEMORY_FLAG_LFU) ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
// 这个链表按照一个得分对所有需要淘汰的对象进行排序 每次只会淘汰那个得分最高的对象
struct evictionPoolEntry *pool = EvictionPoolLRU;
// 通过下面的算法决定哪个key被淘汰的优先级最高
while (bestkey == NULL) {
// total_keys 总计会扫描到多少个key
unsigned long total_keys = 0, keys;
/* We don't want to make local-db choices when expiring keys,
* so to start populate the eviction pool sampling keys from
* every DB.
* 扫描所有db之后(抽样) 得到评分最低的16个key
* */
for (i = 0; i < server.dbnum; i++) {
db = server.db + i;
// 根据此时的内存淘汰策略 选择仅从设置了超时时间的key中淘汰对象 还是db下全key范围淘汰对象
dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
db->dict : db->expires;
if ((keys = dictSize(dict)) != 0) {
// 抽样计算得分并填充到evictionPool中
evictionPoolPopulate(i, dict, db->dict, pool);
total_keys += keys;
}
}
// 代表此时db下没有任何key/或者在expires中没有任何key 也就无法淘汰数据
if (!total_keys) break; /* No keys to evict. */
/* Go backward from best to worst element to evict.
* 从后往前看,因为在pool中idle得分越高的在越后面
* */
for (k = EVPOOL_SIZE - 1; k >= 0; k--) {
if (pool[k].key == NULL) continue;
bestdbid = pool[k].dbid;
// 回查之前存入的数据
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
de = dictFind(server.db[pool[k].dbid].dict,
pool[k].key);
} else {
de = dictFind(server.db[pool[k].dbid].expires,
pool[k].key);
}
/* Remove the entry from the pool. */
if (pool[k].key != pool[k].cached)
sdsfree(pool[k].key);
pool[k].key = NULL;
pool[k].idle = 0;
/* If the key exists, is our pick. Otherwise it is
* a ghost and we need to try the next element. */
if (de) {
bestkey = dictGetKey(de);
break;
} else {
/* Ghost... Iterate again. */
}
}
}
}
/* volatile-random and allkeys-random policy
* 这里选择的是随机淘汰策略 上面的话扫描n个db后仅选出了一个key效率会不会太低?
* 这里随机选出一个key后就可以直接返回了
* */
else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM) {
/* When evicting a random key, we try to evict a key for
* each DB, so we use the static 'next_db' variable to
* incrementally visit all DBs. */
for (i = 0; i < server.dbnum; i++) {
j = (++next_db) % server.dbnum;
db = server.db + j;
dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
db->dict : db->expires;
if (dictSize(dict) != 0) {
de = dictGetRandomKey(dict);
bestkey = dictGetKey(de);
bestdbid = j;
break;
}
}
}
/* Finally remove the selected key.
* 代表本轮找到了合适的key
* */
if (bestkey) {
db = server.db + bestdbid;
robj *keyobj = createStringObject(bestkey, sdslen(bestkey));
// 因为该key将被强制淘汰 需要将信息同步到aof/副本上
propagateExpire(db, keyobj, server.lazyfree_lazy_eviction);
/* We compute the amount of memory freed by db*Delete() alone.
* It is possible that actually the memory needed to propagate
* the DEL in AOF and replication link is greater than the one
* we are freeing removing the key, but we can't account for
* that otherwise we would never exit the loop.
*
* Same for CSC invalidation messages generated by signalModifiedKey.
*
* AOF and Output buffer memory will be freed eventually so
* we only care about memory used by the key space. */
delta = (long long) zmalloc_used_memory();
latencyStartMonitor(eviction_latency);
// 判断是否采用惰性释放的方式删除对象
// 无论是同步还是异步都会立即从db中删除这个key 但是内存的释放可能会延后(调用dbAsyncDelete不一定就会通过bio进行释放 还要看本次要释放的redisObject大小
// 只有对象比较大的时候 回收耗时长 才会通过bio回收任务)
if (server.lazyfree_lazy_eviction)
dbAsyncDelete(db, keyobj);
else
dbSyncDelete(db, keyobj);
latencyEndMonitor(eviction_latency);
latencyAddSampleIfNeeded("eviction-del", eviction_latency);
delta -= (long long) zmalloc_used_memory();
// 计算此时已经被释放掉的内存 如果对象被惰性删除了 实际上这里内存还不会立即释放 还会继续抽样
mem_freed += delta;
server.stat_evictedkeys++;
signalModifiedKey(NULL, db, keyobj);
// 发出一个淘汰事件
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
keyobj, db->id);
decrRefCount(keyobj);
keys_freed++;
/* When the memory to free starts to be big enough, we may
* start spending so much time here that is impossible to
* deliver data to the slaves fast enough, so we force the
* transmission here inside the loop.
* 提前将缓冲区的数据发送出去 主要是同步副本的删除命令 避免副本没有足够的空间执行之后的命令
* */
if (slaves) flushSlavesOutputBuffers();
/* Normally our stop condition is the ability to release
* a fixed, pre-computed amount of memory. However when we
* are deleting objects in another thread, it's better to
* check, from time to time, if we already reached our target
* memory, since the "mem_freed" amount is computed only
* across the dbAsyncDelete() call, while the thread can
* release the memory all the time.
* 如果采用的是惰性删除,内存的释放是异步的,那么每隔16轮主动去重新检测内存是否足够
* */
if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) {
if (getMaxmemoryState(NULL, NULL, NULL, NULL) == C_OK) {
/* Let's satisfy our stop condition. */
mem_freed = mem_tofree;
}
}
} else {
// 代表本轮没有找到任何可以释放的key
goto cant_free; /* nothing to free... */
}
}
result = C_OK;
cant_free:
/* We are here if we are not able to reclaim memory. There is only one
* last thing we can try: check if the lazyfree thread has jobs in queue
* and wait...
* 代表本次没有释放足够的空间
* */
if (result != C_OK) {
latencyStartMonitor(lazyfree_latency);
// 阻塞等待后台任务处理完所有的内存释放任务 如果中途有足够的内存了 就可以返回ok 如果内存还是不足 且没有释放内存的任务了 本次就没有足够的内存空间
while (bioPendingJobsOfType(BIO_LAZY_FREE)) {
if (getMaxmemoryState(NULL, NULL, NULL, NULL) == C_OK) {
result = C_OK;
break;
}
// 这是沉睡1000毫秒
usleep(1000);
}
latencyEndMonitor(lazyfree_latency);
latencyAddSampleIfNeeded("eviction-lazyfree", lazyfree_latency);
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("eviction-cycle", latency);
return result;
}
Copy the code
GetMaxmemoryState () detects current memory usage
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level) { size_t mem_reported, mem_used, mem_tofree; /* Check if we are over the memory usage limit. If we are not, We can just return ASAP. * obtain the total memory used at this time * */ mem_reported = zmalloc_used_memory(); if (total) *total = mem_reported; /* We may return ASAP if there is no need to compute the level. * * We may return ASAP if there is no need to compute the level return_ok_asap = ! server.maxmemory || mem_reported <= server.maxmemory; if (return_ok_asap && ! level) return C_OK; /* Remove the size of slaves output buffers and AOF buffer from the * count of used memory. */ mem_used = mem_reported; / / here can be aof and slave memory ignore size_t overhead = freeMemoryGetNotCountedMemory (); mem_used = (mem_used > overhead) ? mem_used - overhead : 0; /* Compute the ratio of memory usage. * TODO Compute the level * */ if (level) {if (! server.maxmemory) { *level = 0; } else { *level = (float) mem_used / (float) server.maxmemory; } } if (return_ok_asap) return C_OK; /* Check if we are still over the memory limit * */ if (mem_used <= server.maxmemory) return C_OK; /* Compute how much memory we need tofree * */ mem_tofree = mem_used-server. Maxmemory; if (logical) *logical = mem_used; if (tofree) *tofree = mem_tofree; return C_ERR; }Copy the code
EvictionPoolPopulate () samples the REdisobjects from the DB and calculates their score. The higher the score, the better the removal effect. Populate a pool with the high-scoring objects and eliminate only one at a time.
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) { int j, k, count; DictEntry *samples[server.maxmemory_samples]; dictEntry *samples[server.maxmemory_samples]; // Here is a random selection of keys and fills them into samples count = dictGetSomeKeys(sampledict, samples, server.maxmemory_samples); for (j = 0; j < count; j++) { unsigned long long idle; sds key; robj *o; dictEntry *de; // Obtain keys de = samples randomly used under this slot [j]; key = dictGetKey(de); /* If the dictionary we are sampling from is not the main * dictionary (but the expires one) we need to lookup the key * Again in the key dictionary to obtain the value object. * If the elimination strategy allows you to eliminate unoutdated key switching dict containers, The value of a timeout - based dict is different from that of a common dict. One is a long and the other is a redisObject (server.maxmemory_policy ! = MAXMEMORY_VOLATILE_TTL) {// Search for keys from db.dict,redisObject if (sampledict! = keydict) de = dictFind(keydict, key); o = dictGetVal(de); } /* Calculate the idle time according to the policy. This is called * idle just because the code initially handled LRU, But is in fact * just a score where an higher score means better candidate (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) { idle = estimateObjectIdleTime(o); } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { /* When we use an LRU policy, we sort the keys by idle time * so that we expire keys starting from greater idle time. * However when the policy is an LFU one, we have a frequency * estimation, and we want to evict keys with lower frequency * first. So inside the pool we put objects using the inverted * frequency Subtracting the actual frequency to the maximum * frequency of 255. * Idle = 255 - LFUDecrAndReturn(o); Idle} else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {* In this case the sooner the idle} else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {* In this case the sooner the expire the better. */ idle = ULLONG_MAX - (long) dictGetVal(de); ServerPanic ("Unknown Eviction policy in evictionPoolPopulate()"); /* Insert the element inside the pool. * First, find the first empty bucket or the first populated * bucket that has an idle time smaller than our idle time. * * */ k = 0; Idle = 0 and key = null // Pool element idle = 0 and key = null // Pool element idle = 0 and key = null While (k < EVPOOL_SIZE && pool[k].key && pool[k].idle < idle) k++; If (k == 0 && pool[evpool_size-1].key! = 0 && pool[evpool_size-1].key! = 0 && pool[evpool_size-1].key! = NULL) { /* Can't insert if the element is < the worst element we have * and there are no empty buckets. */ continue; } else if (k < EVPOOL_SIZE && pool[k].key == NULL) {/* Inserting into empty position.no setup Insert. */} else {Inserting in the middle. Now k points to the first element * greater If (pool[EVPOOL_SIZE - 1].key ==) if (pool[EVPOOL_SIZE - 1].key == NULL) { /* Free space on the right? Insert at k shifting * all the elements from k to end to the right. */ /* Save SDS before overwriting. */ sds cached = pool[EVPOOL_SIZE - 1].cached; memmove(pool + k + 1, pool + k, sizeof(pool[0]) * (EVPOOL_SIZE - k - 1)); pool[k].cached = cached; } else { /* No free space on right? Insert at k-1 */ / Select * from pool where id = 1; /* Shift all elements on the left of k (included) to the * left, so we discard the element with smaller idle time.*/ sds cached = pool[0].cached; /* Save SDS before overwriting. */ if (pool[0].key ! = pool[0].cached) sdsfree(pool[0].key); memmove(pool, pool + 1, sizeof(pool[0]) * k); pool[k].cached = cached; } } /* Try to reuse the cached SDS string allocated in the pool entry, * because allocating and deallocating this object is costly * (according to the profiler, not my fantasy. Remember: * Premature optimization bla bla bla. * A cache was performed for a key because a redisObject with a high score may always have the same key * It's not clear what pool[k].cached does, but it doesn't affect the elimination ranking logic itself. * */ int klen = sdslen(key); If (klen > EVPOOL_CACHED_SDS_SIZE) {pool[k].key = sdsdup(key); } else {memcpy(pool[k].cached, klen + 1); sdssetlen(pool[k].cached, klen); pool[k].key = pool[k].cached; } // Pool [k]. Idle = idle; pool[k].dbid = dbid; }}Copy the code
Call () is then the actual command executed, and qualifying commands are written to the AOF and synchronized to the replica.
void call(client *c, int flags) {
long long dirty;
ustime_t start, duration;
int client_old_flags = c->flags;
struct redisCommand *real_cmd = c->cmd;
server.fixed_time_expire++;
/* Initialization: clear the flags that must be set by the command on
* demand, and initialize the array for additional commands propagation.
* 去掉这些标记 避免影响之后的判断
* */
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
// 每当执行某个新的command时 重置also_propagate 这是不同于事务的另一套体系
redisOpArray prev_also_propagate = server.also_propagate;
redisOpArrayInit(&server.also_propagate);
/* Call the command.
* 先记录此时脏数据数量
* */
dirty = server.dirty;
updateCachedTime(0);
start = server.ustime;
// 执行command 此时可能会发起对某些redisObject的操作 同时可能会将某些信息写入到client的缓冲区
c->cmd->proc(c);
duration = ustime()-start;
// 代表有多少个redisObject受打影响
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;
/* After executing command, we will close the client after writing entire
* reply if it is set 'CLIENT_CLOSE_AFTER_COMMAND' flag.
* 如果此时发现了CLIENT_CLOSE_AFTER_COMMAND 标记 需要更新成CLIENT_CLOSE_AFTER_REPLY
* */
if (c->flags & CLIENT_CLOSE_AFTER_COMMAND) {
c->flags &= ~CLIENT_CLOSE_AFTER_COMMAND;
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
// TODO 忽略统计项
if (flags & CMD_CALL_STATS) {
/* use the real command that was executed (cmd and lastamc) may be
* different, in case of MULTI-EXEC or re-written commands such as
* EXPIRE, GEOADD, etc. */
real_cmd->microseconds += duration;
real_cmd->calls++;
}
/* Propagate the command into the AOF and replication link
* 原本要求同步到aof以及rep上 而执行command后 并没有需要防止所有的传播 换句话就是需要传播
* */
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
// 是否会传播需要通过下面的逻辑判断 比如执行multi实际上不会触发传播 而在执行exec命令时会将multi包括内部所有子命令一起传播 (前提是内部所有command中只要有一条满足传播条件,也就是非只读,非admin)
int propagate_flags = PROPAGATE_NONE;
/* Check if the command operated changes in the data set. If so
* set for replication / AOF propagation.
* 代表至少有某个数据发生了变化 这时就要将command同步到aof/副本上
* */
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
/* If the client forced AOF / replication of the command, set
* the flags regardless of the command effects on the data set.
* 可能数据没有变化 根据执行的command判断需要同步的是哪种类型
* */
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
/* However prevent AOF / replication propagation if the command
* implementation called preventCommandPropagation() or similar,
* or if we don't have the call() flags to do so.
* 目前仅在set/stream中看到设置保护模式 先忽略
* 可能某些情况下此时不适合同步到aof或者是副本上 这里情况就不展开了
* */
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;
/* Call propagate() only if at least one of AOF / replication
* propagation is needed. Note that modules commands handle replication
* in an explicit way, so we never replicate them automatically.
* 将本次操作写入到aof以及同步到其他副本
* */
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
/* Restore the old replication flags, since call() can be executed
* recursively. 这2步是恢复调用前的flags */
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
c->flags |= client_old_flags &
(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
// 以上已经执行完command 并且同时写入到了aof和副本中 (对于aof并不会同步刷盘,写入效率高,但是存在数据丢失问题,接下来就要看如何解决这个问题,副本的写入是异步的)
/* Handle the alsoPropagate() API to handle commands that want to propagate
* multiple separated commands. Note that alsoPropagate() is not affected
* by CLIENT_PREVENT_PROP flag.
* 这些是module相关的 module可以理解为一种插件系统 这里的逻辑先忽略 (在redistSet中有看到相关的逻辑)
* */
if (server.also_propagate.numops) {
int j;
redisOp *rop;
if (flags & CMD_CALL_PROPAGATE) {
int multi_emitted = 0;
/* Wrap the commands in server.also_propagate array,
* but don't wrap it if we are already in MULTI context,
* in case the nested MULTI/EXEC.
*
* And if the array contains only one command, no need to
* wrap it, since the single command is atomic.
* 这里可以简单理解成传播了一个事务命令
* */
if (server.also_propagate.numops > 1 &&
!(c->cmd->flags & CMD_MODULE) &&
!(c->flags & CLIENT_MULTI) &&
!(flags & CMD_CALL_NOWRAP))
{
execCommandPropagateMulti(c);
multi_emitted = 1;
}
for (j = 0; j < server.also_propagate.numops; j++) {
rop = &server.also_propagate.ops[j];
int target = rop->target;
/* Whatever the command wish is, we honor the call() flags. */
if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
if (target)
propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
}
if (multi_emitted) {
execCommandPropagateExec(c);
}
}
redisOpArrayFree(&server.also_propagate);
}
// 还原数据 看来会存在嵌套的场景
server.also_propagate = prev_also_propagate;
server.fixed_time_expire--;
server.stat_numcommands++;
/* Record peak memory after each command and before the eviction that runs
* before the next command. */
size_t zmalloc_used = zmalloc_used_memory();
if (zmalloc_used > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used;
}
Copy the code
The above code ignores the monitoring module, lua script. You can see that the core logic is to execute a function, c-> CMD ->proc(c). The redisObject operation is to locate the redisObject via db and key (in API layer corresponding to redis set, list, stream, zset,string). While in the underlying ziplist, corresponding data structure quicklist, intset, dict… I won’t talk about these data structures in isolation. The relatively complicated one is Ziplist, Zskiplist.) And perform related functions to add, delete, change and check operations. These last operations may result in some key being inserted, such as a new redisObject, while a client is blocking and waiting for such a key to be inserted. I can unblock it. And execute the previously blocked command. Corresponding handleClientsBlockedOnKeys ()
void handleClientsBlockedOnKeys(void) { while(listLength(server.ready_keys) ! = 0) { list *l; /* Point server.ready_keys to a fresh list and save the current one * locally. This way as we run the old list we are free to call * signalKeyAsReady() that may push new elements in server.ready_keys * when handling clients blocked into BRPOPLPUSH. */ l = server.ready_keys; server.ready_keys = listCreate(); while(listLength(l) ! = 0) { listNode *ln = listFirst(l); readyList *rl = ln->value; /* First of all remove this key from db->ready_keys so that * we can safely call signalKeyAsReady() against this key. * * */ dictDelete(rl->db->ready_keys,rl->key); dictDelete(rl->db->ready_keys,rl->key); /* Even if we are not inside call(), increment the call depth * in order to make sure that keys are expired against a fixed * reference time, and not against the wallclock time. This * way we can lookup an object multiple times (BRPOPLPUSH does * that) without the risk of it being freed in the second * lookup, invalidating the first one. * See https://github.com/antirez/redis/pull/6554. */ server.fixed_time_expire++; updateCachedTime(0); /* Serve clients blocked on the key. */ robj *o = lookupKeyWrite(rl->db,rl->key); // if (o! = NULL) { if (o->type == OBJ_LIST) serveClientsBlockedOnListKey(o,rl); Else if (o - > type = = OBJ_ZSET) / / blocking waiting is zset type robj serveClientsBlockedOnSortedSetKey (o, rl); Else if (o - > type = = OBJ_STREAM) / / TODO ignore serveClientsBlockedOnStreamKey stream type (o, rl); /* We want to serve clients blocked on module keys * regardless of the object type: we don't know what the * module is trying to accomplish right now. */ serveClientsBlockedOnKeyByModule(rl); } server.fixed_time_expire--; /* Free this item. */ decrRefCount(rl->key); zfree(rl); listDelNode(l,ln); } listRelease(l); /* We have the new list on place at this point. */ } }Copy the code
Just look at the logic of the list, the rest is similar
/* Helper function for handleClientsBlockedOnKeys(). This function is called * when there may be clients blocked on a list key, And there may be new * data to fetch (the key is ready). * @param readyList This call is to examine whether there is a match in blocking_keys key and unblocked (blocking the key right now is ready before) * * / void serveClientsBlockedOnListKey (robj * o, readyList *rl) { /* We serve clients in the same order they blocked for * this key, from the first blocked to the last. * */ dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); // blocking_keys if (de) {list *clients = dictGetVal(de); Int numClients = listLength(clients); while(numclients--) { listNode *clientnode = listFirst(clients); client *receiver = clientnode->value; If (receiver->btype! = BLOCKED_LIST) { /* Put at the tail, so that at the next call * we'll not run into it again. */ listRotateHeadToTail(clients); continue; } // If dstKey is not empty robj * dstKey = receiver->bpop.target; int where = (receiver->lastcmd && receiver->lastcmd->proc == blpopCommand) ? LIST_HEAD : LIST_TAIL; robj *value = listTypePop(o,where); if (value) { /* Protect receiver->bpop.target, that will be * freed by the next unblockClient() * call. */ if (dstkey) incrRefCount(dstkey); // unblockClient(receiver) is removed from robjList by blocking_keys; If (serveClientBlockedOnList(receiver, serveClientBlockedOnList) rl->key,dstkey,rl->db,value, Where) == C_ERR) {/* If we failed serving the client we need * to also undo the POP operation * */ listTypePush(o,value, WHERE); } if (dstkey) decrRefCount(dstkey); decrRefCount(value); } else {// If the element in redisObject is exhausted, the data break is not ready; If (listTypeLength(o) == 0) {if (listTypeLength(o) == 0) { dbDelete(rl->db,rl->key); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",rl->key,rl->db->id); } /* We don't call signalModifiedKey() as it was already called * when an element was pushed on the list. */ }Copy the code
The result is written to the client’s buffer and asynchronously written to the socket in the serverCron event loop.
In the process of reading, I encountered some problems, mainly including: 1. Under what conditions can BlockClient 2 be generated? Is it always the master node that receives commands from the architecture of the cluster? This indirectly affects. Is the routing table maintained by all nodes in a master/slave structure or is the routing table maintained only by the master? If all nodes are maintained, how is data synchronization done, and how is only maintained by the master? What about downtime? When does the routing table come into play? 3. How to synchronize the expired key to other nodes? What about the other nodes?
These issues may all be clustered (block is not certain, but at least the command is rarely used in business scenarios, so it can’t be ruled out that the command is for clustering), which leads to the interpretation of cluster/replica/sentinel.