The client

The data structure
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061Copy the code
/* With multiplexing we need to take per-client state. * Clients are taken in a linked list. */typedef struct client {    uint64_t id;            /* Client incremental unique ID. */    int fd;                 /* Client socket. */    redisDb *db;            /* Pointer to currently SELECTed DB. */    robj *name;             /* As set by CLIENT SETNAME. */    sds querybuf;           /* Buffer we use to accumulate client queries. */    size_t qb_pos;          /* The position we have read in querybuf. */    sds pending_querybuf;   /* If this client is flagged as master, this buffer                               represents the yet not applied portion of the                               replication stream that we are receiving from                               the master. */    size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size. */    int argc;               /* Num of arguments of current command. */    robj **argv;            /* Arguments of current command. */    struct redisCommand *cmd, *lastcmd;  /* Last command executed. */    int reqtype;            /* Request protocol type: PROTO_REQ_* */    int multibulklen;       /* Number of multi bulk arguments left to read. */    long bulklen;           /* Length of bulk argument in multi bulk request. */    list *reply;            /* List of reply objects to send to the client. */    unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */    size_t sentlen;         /* Amount of bytes already sent in the current                               buffer or object being sent. */    time_t ctime;           /* Client creation time. */    time_t lastinteraction; /* Time of the last interaction, used for timeout */    time_t obuf_soft_limit_reached_time;    int flags;              /* Client flags: CLIENT_* macros. */    int authenticated;      /* When requirepass is non-NULL. */    int replstate;          /* Replication state if this is a slave. */    int repl_put_online_on_ack; /* Install slave write handler on ACK. */    int repldbfd;           /* Replication DB file descriptor. */    off_t repldboff;        /* Replication DB file offset. */    off_t repldbsize;       /* Replication DB file size. */    sds replpreamble;       /* Replication DB preamble. */    long long read_reploff; /* Read replication offset if this is a master. */    long long reploff;      /* Applied replication offset if this is a master. */    long long repl_ack_off; /* Replication ack offset, ifthis is a slave. */ long long repl_ack_time; /* Replication ack time,if this is a slave. */    long long psync_initial_offset; /* FULLRESYNC reply offset other slaves                                       copying this slave output buffer                                       should use. */    char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */    int slave_listening_port; /* As configured with: SLAVECONF listening-port */    char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */    int slave_capa;         /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */    multiState mstate;      /* MULTI/EXEC state */    int btype;              /* Type of blocking op if CLIENT_BLOCKED. */    blockingState bpop;     /* blocking state */    long long woff;         /* Last write global replication offset. */    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */    dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */    list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */    sds peerid;             /* Cached peer ID. */    listNode *client_list_node; /* list node inclient list */ /* Response buffer */ int bufpos; char buf[PROTO_REPLY_CHUNK_BYTES]; } client;Copy the code

At first glance, I found that wiping a client data structure was really much ah. It’s a distraction. It’s okay. You don’t have to remember them all. Just a few important ones.

The configuration file

Client command
12345678910111213Copy the code
"id -- Return the ID of the current connection."."getname -- Return the name of the current connection."."kill 
           
             -- Kill connection made from 
            
             ."
            
           ."kill ." addr 
           
             -- Kill connection made from 
            
             "
            
           ." type (normal|master|replica|pubsub) -- Kill connections by type."." skipme (yes|no) -- Skip killing current connection (default: yes)."."list [options ...]  -- Return information about client connections. Options:"." type (normal|master|replica|pubsub) -- Return clients of specified type."."pause 
           
             -- Suspend all Redis clients for 
            
              milliseconds."
            
           ."reply (on|off|skip) -- Control the replies sent to the current connection."."setname 
           
             -- Assign the name 
            
              to the current connection."
            
           ."unblock <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client."Copy the code
  1. Client list-> View information about the client

Among them

  • Flags indicates the client type N- indicates normal client, M- indicates master, and so on
1234567891011121314151617Copy the code
int getClientTypeByName(char *name) {    if(! strcasecmp(name,"normal")) return CLIENT_TYPE_NORMAL;    else if(! strcasecmp(name,"slave")) return CLIENT_TYPE_SLAVE;    else if(! strcasecmp(name,"replica")) return CLIENT_TYPE_SLAVE;    else if(! strcasecmp(name,"pubsub")) return CLIENT_TYPE_PUBSUB;    else if(! strcasecmp(name,"master")) return CLIENT_TYPE_MASTER;    else return- 1; }char *getClientTypeName(int class) { switch(class) {case CLIENT_TYPE_NORMAL: return "normal";    case CLIENT_TYPE_SLAVE:  return "slave";    case CLIENT_TYPE_PUBSUB: return "pubsub";    case CLIENT_TYPE_MASTER: return "master";    default:                       return NULL;    }}Copy the code
  • Obl stands for the length of the fixed buffer
  • Oll stands for the length of the dynamic buffer list
  • Omem indicates the number of bytes used
  • Events represents the event type (R/W)
  • CMD records the command executed for the last time

See function for details

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253Copy the code
/* Concatenate a string representing the state of a client in an human * readable format, into the sds string 's'. */sds catClientInfoString(sds s, client *client) {    char flags[16], events[3], *p;    int emask;    p = flags;    if (client->flags & CLIENT_SLAVE) {        if (client->flags & CLIENT_MONITOR)            *p++ = 'O';        else            *p++ = 'S';    }    if (client->flags & CLIENT_MASTER) *p++ = 'M';    if (client->flags & CLIENT_PUBSUB) *p++ = 'P';    if (client->flags & CLIENT_MULTI) *p++ = 'x';    if (client->flags & CLIENT_BLOCKED) *p++ = 'b';    if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd';    if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c';    if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u';    if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A';    if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U';    if (client->flags & CLIENT_READONLY) *p++ = 'r';    if (p == flags) *p++ = 'N';    *p++ = '\ 0';    emask = client->fd == -1 ? 0 : aeGetFileEvents(server.el,client->fd);    p = events;    if (emask & AE_READABLE) *p++ = 'r';    if (emask & AE_WRITABLE) *p++ = 'w';    *p = '\ 0';    return sdscatfmt(s,        "id=%U addr=%s fd=%i name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U obl=%U oll=%U omem=%U events=%s cmd=%s",        (unsigned long long) client->id,        getClientPeerId(client),        client->fd,        client->name ? (char*)client->name->ptr : "",        (long long)(server.unixtime - client->ctime),        (long long)(server.unixtime - client->lastinteraction),        flags,        client->db->id,        (int) dictSize(client->pubsub_channels),        (int) listLength(client->pubsub_patterns),        (client->flags & CLIENT_MULTI) ? client->mstate.count : -1,        (unsigned long long) sdslen(client->querybuf),        (unsigned long long) sdsavail(client->querybuf),        (unsigned long long) client->bufpos,        (unsigned long long) listLength(client->reply),        (unsigned long long) getClientOutputBufferMemoryUsage(client),        events,        client->lastcmd ? client->lastcmd->name : "NULL"); }Copy the code
  1. Info Clients — > View all clients

  • Connected_clients: indicates the number of client connections on the current Redis node. This parameter needs to be monitored carefully. If the number exceeds maxClients, new client connections will be rejected.
  • Client_longest_output_list: specifies the maximum number of queue objects in all output buffers.
  • Client_biggest_input_buf: specifies the maximum capacity occupied by all input buffers.
  • Blocked_clients: Number of clients executing blocking commands such as BLPOP, brPOP, brpoplpush
Client Shutdown
  1. Invoke the client kill command
  2. The command does not conform to the protocol format
  3. Client Timeout
  4. The input buffer exceeds the threshold 1 gb
  5. Output buffer exceeds threshold (soft limit and hard limit, client shuts down immediately after hard limit, shuts down after soft limit lasts for specified second)

The format for the client – the output buffer – limit

client-output-buffer-limit normal 0 0 0

client-output-buffer-limit replica 256mb 64mb 60

client-output-buffer-limit pubsub 32mb 8mb 60

In the command, class indicates the client type. If the client exceeds the hard limit, the client is shut down immediately. If the client exceeds the soft limit and lasts for soft seconds, the client is shut down again.

The class values:

normal -> normal clients including MONITOR clients

slave -> slave clients

pubsub -> clients subscribed to at least one pubsub channel or pattern

Note: Client checks are performed in the serverCron timing function

12345678910111213141516171819202122232425262728293031323334353637Copy the code
#define CLIENTS_CRON_MIN_ITERATIONS 5void clientsCron(void) { /* Try to process at least numclients/server.hz of clients  * per call. Since normally (if there are no big latency events) this * function is called server.hz times per second, in the average case we * process all the clients in 1 second. */ int numclients = listLength(server.clients); Int iterations = numclients/server.hz; // You can go through more iterations as follows. mstime_t now = mstime(); /* Process at least a few clients while we are at it, even if we need * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract * of processing each client once  per second. */ if (iterations < CLIENTS_CRON_MIN_ITERATIONS) iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ? numclients : CLIENTS_CRON_MIN_ITERATIONS; while(listLength(server.clients) && iterations--) { client *c; listNode *head; /* Rotate the list, take the current head, process. * This way if the client must be removed from the list it's the * first element and we don't incur into O(N) computation. */ listRotate(server.clients); head = listFirst(server.clients); c = listNodeValue(head); /* The following functions do different service checks on the client. * The protocol is that they return non-zero if the  client was * terminated. */ if (clientsCronHandleTimeout(c,now)) continue; if (clientsCronResizeQueryBuffer(c)) continue; if (clientsCronTrackExpansiveClients(c)) continue; }}Copy the code

Because Redis is single-threaded, it cannot loop through the client all the time. Client timeout detection is performed in clientsCron of serverCron, which is a periodic function that is executed every 100ms. Server. hz Indicates the frequency of calling the serverCron function. The default value is 10. In the clientsCron function, to loop all clients once every second, iterations are set to numclients/ server.Hz. If there are too many clients, it may cause the redis main thread to block. Therefore, 5.0 introduced dynamic Hz, as shown in the configuration file dynamic-Hz, which is turned on by default.

The service side

The database

The database is stored in the redisDb structure, while the server redisServer structure holds redisDb objects and numbers, which can be updated in the configuration file (see dbnum in the configuration file).

The data structure
12345678910Copy the code
typedef struct redisDb {    dict *dict;                 /* The keyspace for this DB */    dict *expires;              /* Timeout of keys with a timeout set */    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP)*/    dict *ready_keys;           /* Blocked keys that received a PUSH */    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */    int id;                     /* Database ID */    long long avg_ttl;          /* Average TTL, just for stats */    list *defrag_later;         /* List of key names to attempt to defrag one by one, gradually. */} redisDb;Copy the code

RedisServer contains redisDb data structures. Indicates which DB is currently in use.

12345678910111213141516Copy the code
struct redisServer {    /* General */    pid_t pid;                  /* Main process pid. */    char *configfile;           /* Absolute config file path, or NULL */    char *executable;           /* Absolute executable file path. */    char **exec_argv;           /* Executable argv vector (copy). */    int dynamic_hz;             /* Change hz value depending on # of clients. */ int config_hz; /* Configured HZ value. May be different than the actual 'hz' field value if dynamic-hz is enabled. */ int hz; /* serverCron() calls frequency in Hertz */ . }Copy the code
Database switching

Database switching is performed using SELECT. C ->db = &server.db[id]; c->db = &server.db[id];

123456Copy the code
int selectDb(client *c, int id) {    if (id < 0 || id >= server.dbnum)        return C_ERR;    c->db = &server.db[id];    returnC_OK; }Copy the code
Key space

Redis can have multiple databases, and different databases do not affect each other. How to do that? Key space can be understood as a C++ namespace for isolation. The data is stored in a dict object in redisDb. So you’re basically operating on keys based on key space.

The essence is the dict object in each redisDd

12345678910Copy the code
void setKey(redisDb *db, robj *key, robj *val) {    if (lookupKeyWrite(db,key) == NULL) {        dbAdd(db,key,val);    } else{ dbOverwrite(db,key,val); } incrRefCount(val); removeExpire(db,key); signalModifiedKey(db,key); }Copy the code

Expired keys
  1. In Redis, expiration keys are stored in the redisDb expires variable, which is a dict pointer type.

  2. storage

  • Key holds the Key object of the database

  • Value holds the expiration time of the database key object, which is a long integer Unix time.

12345Copy the code
 de = dictAddOrFind(db->expires,dictGetKey(kde));    dictSetSignedIntegerVal(de,when);#define dictSetSignedIntegerVal(entry, _val_) \ do { (entry)->v.s64 = _val_; } while(0)Copy the code
  1. Setting expiration Time

Relative mode: expire, in seconds; Pexpire, in milliseconds

Absolute mode: expireat, unit: second; Pexpireat < timestamp >, in milliseconds. All other commands are converted to pexpireat

Expire command: setExpire(c,c->db,key,when) — >de = dictFind(db->dict,key-> PTR) — >dictAddOrFind(db->expires,dictGetKey(de))

  1. Deleting expiration Time

Persist (c->db,c->argv[1]) — >dictDelete(db->expires,key-> PTR)

  1. Check the remaining time of the key

View expiration time:

TTL, a unit of seconds; PTTL, millisecond

LookupKeyReadWithFlags – > getExpire — – > TTL = expire – mstime ();

  1. Key expiration policy

Redis uses periodic deletion and lazy deletion

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163Copy the code
void activeExpireCycle(int type) {    /* This function has some global state in order to continuethe work * incrementally across calls. */ static unsigned int current_db = 0; Static int timelimit_exit = 0; /* Timelimit hit inprevious call? */ static long long last_fast_cycle = 0; /* When last fast cycle ran. */ int j, iteration = 0; int dbs_per_call = CRON_DBS_PER_CALL; long long start = ustime(), timelimit, elapsed; /* When clients are paused the dataset should be static not just from the * POV of clients not being able to write, but also from the POV of * expires and evictions of keys not being performed. */if (clientsArePaused()) return;    if (type == ACTIVE_EXPIRE_CYCLE_FAST) {        /* Don't start a fast cycle if the previous cycle did not exit * for time limit. Also don't repeat a fast cycle for the same period         * as the fast cycle total duration itself. */        if(! timelimit_exit)return;        if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return;        last_fast_cycle = start;    }    /* We usually should test CRON_DBS_PER_CALL per iteration, with     * two exceptions:     *     * 1) Don't test more DBs than we have. * 2) If last time we hit the time limit, we want to scan all DBs * in this iteration, as there is work to do in some DB and we don't want     * expired keys to use memory for too much time. */    if (dbs_per_call > server.dbnum || timelimit_exit)        dbs_per_call = server.dbnum;    /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time     * per iteration. Since this function gets called with a frequency of     * server.hz times per second, the following is the max amount of     * microseconds we can spend inThis function. */ / A maximum of 25% of the CPU time can be used to clean expired keys. // A maximum of 250ms of activeExpireCycle can be performed at a time. A maximum of 25ms timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100 can be executed once for the activeExpireCycle service. timelimit_exit = 0;if(timelimit <= 0) timelimit = 1; // If ACTIVE_EXPIRE_CYCLE_FAST is used, the time limit is ACTIVE_EXPIRE_CYCLE_FAST_DURATIONif (type== ACTIVE_EXPIRE_CYCLE_FAST) timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; / *inmicroseconds. */ /* Accumulate some global stats as we expire keys, to have some idea * about the number of keys that are already logically expired, but still * existing inside the database. */ long total_sampled = 0; long total_expired = 0; // if timelimit_exit times out, we won't loop through the next one eitherfor (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {        int expired;        redisDb *db = server.db+(current_db % server.dbnum);        /* Increment the DB now so we are sure if we run out of time         * in the current DB we'll restart from the next. This allows to * distribute the time evenly across DBs. */ current_db++; /* Continue to expire if at the end of the cycle more than 25% * of the keys were expired. */ do { unsigned long num, slots; long long now, ttl_sum; int ttl_samples; iteration++; /* If there is nothing to expire try next DB ASAP. */ if ((num = dictSize(db->expires)) == 0) { db->avg_ttl = 0; break; } slots = dictSlots(db->expires); now = mstime(); /* When there are less than 1% filled slots getting random * keys is expensive, so stop here waiting for better times... * The dictionary will be resized asap. */ if (num && slots > DICT_HT_INITIAL_SIZE && (num*100/slots < 1)) break; /* The main collection cycle. Sample random keys among keys * with an expire set, checking for expired ones. */ expired = 0; ttl_sum = 0; ttl_samples = 0; // Obtain ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP keys at a time, If (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP; while (num--) { dictEntry *de; long long ttl; // Select a random key from the expiring table and get its TTL if ((de = dictGetRandomKey(db-> Expires)) == NULL) break; ttl = dictGetSignedIntegerVal(de)-now; / / whether he expired, if overdue to delete, as to be synchronous or asynchronous, then whatever you if (activeExpireCycleTryExpire (db, DE, now)) expired++; if (ttl > 0) { /* We want the average TTL of keys yet not expired. */ ttl_sum += ttl; ttl_samples++; } total_sampled++; } total_expired += expired; /* Update the average TTL stats for this database. */ if (ttl_samples) { long long avg_ttl = ttl_sum/ttl_samples; /* Do a simple running average with a few samples. * We just use the current estimate with a weight of 2% * and the previous estimate with a weight of 98%. */ if (db->avg_ttl == 0) db->avg_ttl = avg_ttl; db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50); } /* We can't block forever here even if there are many keys to             * expire. So after a given amount of milliseconds return to the             * caller waiting forThe other active expire cycle. * * Counts the time that the function has run every 16 iterations. If the time exceeds timelimit_exit, * sets the timelimit_exit flag to 1, indicating that the program has timed out. Need to force exit * */if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */                elapsed = ustime()-start;                if (elapsed > timelimit) {                    timelimit_exit = 1;                    server.stat_expired_time_cap_reached_count++;                    break;                }            }            /* We don't repeat the cycle if there are less than 25% of keys * found expired in the current DB. */ /** * ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP is the number of expired keys we want to find in each loop, and if the number of expired keys cleared exceeds the desired number by a quarter after each loop, we continue the loop, If the number of expired keys in the database does not reach the desired quarter, we break out of the while loop. Traverse the next database * */} while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4); } elapsed = ustime()-start; latencyAddSampleIfNeeded("expire-cycle",elapsed/1000); /* Update our estimate of keys existing but yet to be expired. * Running average with this sample accounting for 5%. */ double current_perc; if (total_sampled) { current_perc = (double)total_expired/total_sampled; } else current_perc = 0; For server stat_expired_stale_perc = (current_perc * 0.05) + (server. Stat_expired_stale_perc * 0.95); }Copy the code

  • Periodic deletion is divided into two modes, which are called in different scenarios. So the ending judgment is different.

    In databasesCron in the scheduled task serverCron, it is ACTIVE_EXPIRE_CYCLE_SLOW, which means we can spend more time processing. In the beforeSleep function before the event loop, it is ACTIVE_EXPIRE_CYCLE_FAST, because the processing of the event cannot be affected. So, we did an optimization

    12345678Copy the code
    if (type == ACTIVE_EXPIRE_CYCLE_FAST) {        /* Don't start a fast cycle if the previous cycle did not exit * for time limit. Also don't repeat a fast cycle for the same period         * as the fast cycle total duration itself. */        if(! timelimit_exit)return;        if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return;        last_fast_cycle = start;    }Copy the code
  • REDIS_EXPIRELOOKUPS_TIME_PERC is the percentage of CPU allocated to the activeExpireCycle function in a specified period of time. The default value is 25.
  • Up to 16 libraries per loop.
  • Each library requires to find up to 5 expired keys (it should be noted that, because each library is randomly selected a key, so the amount of data should not be too small, too little random effect is not good).
  • In each library, 20 keys are randomly selected at a time. Check whether it is expired key, expired count once. Check every 16 times to see if the time limit is exceeded. If so, exit the loop. No further search.
The effect of expired keys on RDB/AOF/ master/slave will be covered in a later module

Database notification

Database notifications take advantage of the publisk-subscribe model supported in Redis, which allows clients to see key changes in the database by subscribing to a given channel or pattern (stored in client and Server pubsub_Channels and Pubsub_Patterns, but stored differently). This function is classified into two types: key-space notification and event notification. To enable this function, you need to set parameters in the configuration file. See notify keyspace – events

Key space notification – What command was executed by a key

keyspace@:

Event notification – which commands an event is executed by

keyevent@:

123456789101112131415161718192021222324252627282930313233343536373839404142Copy the code
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {    sds chan;    robj *chanobj, *eventobj;    int len = -1;    char buf[24];    /* If any modules are interested in events, notify the module system now.      * This bypasses the notifications configuration, but the module engine     * will only call event subscribers if the event type matches the types     * they are interested in. */     moduleNotifyKeyspaceEvent(type, event, key, dbid);        /* If notifications for this class of events are off, return ASAP. */    if(! (server.notify_keyspace_events &type)) return;    eventobj = createStringObject(event,strlen(event));    /* __keyspace@<db>__:<key> <event> notifications. */    if (server.notify_keyspace_events & NOTIFY_KEYSPACE) {        chan = sdsnewlen("__keyspace@"11); len = ll2string(buf,sizeof(buf),dbid); chan = sdscatlen(chan, buf, len); chan = sdscatlen(chan,"__:", 3);        chan = sdscatsds(chan, key->ptr);        chanobj = createObject(OBJ_STRING, chan);        pubsubPublishMessage(chanobj, eventobj);        decrRefCount(chanobj);    }    /* __keyevent@<db>__:<event> <key> notifications. */    if (server.notify_keyspace_events & NOTIFY_KEYEVENT) {        chan = sdsnewlen("__keyevent@"11);if (len == -1) len = ll2string(buf,sizeof(buf),dbid);        chan = sdscatlen(chan, buf, len);        chan = sdscatlen(chan, "__:", 3); chan = sdscatsds(chan, eventobj->ptr); chanobj = createObject(OBJ_STRING, chan); pubsubPublishMessage(chanobj, key); decrRefCount(chanobj); } decrRefCount(eventobj); }Copy the code

Parameters that

To enable the configuration, you must set it in the configuration file as follows: notify-keyspace-events Elg. To enable the configuration K and E, you must enable at least one of them.