Channel data structure

Redis keeps all subscriptions in the pubsub_Channels dictionary item of the server state:

struct redisServer {. dict *pubsub_channels;/* Map channels to list of subscribed clients */. }Copy the code

Of the dictionary entrykeyIs the name of a channel subscribed to, dictionary entryvalueThis is a linked list of all clients that subscribe to this channel:

Subscribe to the channel

SUBSCRIBE channel [channel …] When a client executes a SUBSCRIBE command to SUBSCRIBE to a channel or channels, the server associates the client with the subscribed channel in the Pubsub_channels dictionary. pubsub.c:

void subscribeCommand(client *c) {
    int j;

    for (j = 1; j < c->argc; j++)
        pubsubSubscribeChannel(c,c->argv[j]);
    c->flags |= CLIENT_PUBSUB;
}
Copy the code

According to whether or not a channel exists in pubsub_channels dictionary, the associated operation can be divided into two cases:

  • If the channel does not exist, it means that the channel does not have any subscribers, the program firstpubsub_channelsCreate the channel in the dictionary with the key name as the channel name, set the value of the key to an empty list, and then add the client to the list.
  • If the channel exists, it already has other subscribers, so it is inpubsub_channelsThere must be a corresponding subscriber list in the dictionary, and all the program has to do is add clients to the end of the subscriber list.
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or * 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel) {.../* Add the channel to the client -> channels hash table */
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        /* Add the client to the channel -> list of clients hash table */
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else{ clients = dictGetVal(de); } listAddNodeTail(clients,c); }... }Copy the code

From the code, the channel is added to the pubsub_Channels dictionary of the client state to be used when the client is offline before the association operation. The dictionary key is the channel name, and the key value is null.

// Add channels to the pubsub_channels dictionary of the client state with a key of channel and a value of NULL
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {}
Copy the code

Suppose you do the following:

client-9527> subscribe qiuxiang 
Copy the code

When there is a new message for a channel, Redis iterates through the list of clients for that channel, sending messages to each client in turn.

unsubscribe

UNSUBSCRIBE channel [channel …] To unsubscribe is to remove the client from the pubsub_Channels dictionary:

client-9527> unsubscribe qiuxiang 
Copy the code

int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {...// Find the dictionary entry for the channel to unsubscribe fromde = dictFind(server.pubsub_channels,channel); .// Subscribe to the channel's list of all clients
        clients = dictGetVal(de);
        // Find the client to unsubscribe fromln = listSearchKey(clients,c); .// Remove the client from the list
        listDelNode(clients,ln);
        // If the list item is not empty, no clients have subscribed to the channel. Remove the channel from the pubsub_Channels dictionary
        if (listLength(clients) == 0) {
            /* Free the list and associated hash entry at all if this was * the latest client, so that it will be possible to abuse * Redis PUBSUB creating millions of channels. */dictDelete(server.pubsub_channels,channel); }... }Copy the code

The client is Offline

When the client is offline, Redis executes the freeClient function

void freeClient(client *c) {...// Unsubscribe all channels
    pubsubUnsubscribeAllChannels(c,0); . }Copy the code
int pubsubUnsubscribeAllChannels(client *c, int notify) {
    // Get the dictionary iterator for the channel to which the client has subscribeddictIterator *di = dictGetSafeIterator(c->pubsub_channels); .// Iterate over the dictionary
    while((de = dictNext(di)) ! =NULL) {
        robj *channel = dictGetKey(de);

        // Unsubscribe in sequencecount += pubsubUnsubscribeChannel(c,channel,notify); }}Copy the code

If Client 1 is offline:

“Client 1” offline:

Afterword.

The Pub /sub function of Redis can only fetch channel messages in real time. When the client is offline, the channel messages are not saved, which is different from MQ service.