Channel data structure
Redis keeps all subscriptions in the pubsub_Channels dictionary item of the server state:
struct redisServer {. dict *pubsub_channels;/* Map channels to list of subscribed clients */. }Copy the code
Of the dictionary entrykey
Is the name of a channel subscribed to, dictionary entryvalue
This is a linked list of all clients that subscribe to this channel:
Subscribe to the channel
SUBSCRIBE channel [channel …] When a client executes a SUBSCRIBE command to SUBSCRIBE to a channel or channels, the server associates the client with the subscribed channel in the Pubsub_channels dictionary. pubsub.c:
void subscribeCommand(client *c) {
int j;
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
c->flags |= CLIENT_PUBSUB;
}
Copy the code
According to whether or not a channel exists in pubsub_channels dictionary, the associated operation can be divided into two cases:
- If the channel does not exist, it means that the channel does not have any subscribers, the program first
pubsub_channels
Create the channel in the dictionary with the key name as the channel name, set the value of the key to an empty list, and then add the client to the list. - If the channel exists, it already has other subscribers, so it is in
pubsub_channels
There must be a corresponding subscriber list in the dictionary, and all the program has to do is add clients to the end of the subscriber list.
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or * 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel) {.../* Add the channel to the client -> channels hash table */
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
/* Add the client to the channel -> list of clients hash table */
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else{ clients = dictGetVal(de); } listAddNodeTail(clients,c); }... }Copy the code
From the code, the channel is added to the pubsub_Channels dictionary of the client state to be used when the client is offline before the association operation. The dictionary key is the channel name, and the key value is null.
// Add channels to the pubsub_channels dictionary of the client state with a key of channel and a value of NULL
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {}
Copy the code
Suppose you do the following:
client-9527> subscribe qiuxiang
Copy the code
When there is a new message for a channel, Redis iterates through the list of clients for that channel, sending messages to each client in turn.
unsubscribe
UNSUBSCRIBE channel [channel …] To unsubscribe is to remove the client from the pubsub_Channels dictionary:
client-9527> unsubscribe qiuxiang
Copy the code
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {...// Find the dictionary entry for the channel to unsubscribe fromde = dictFind(server.pubsub_channels,channel); .// Subscribe to the channel's list of all clients
clients = dictGetVal(de);
// Find the client to unsubscribe fromln = listSearchKey(clients,c); .// Remove the client from the list
listDelNode(clients,ln);
// If the list item is not empty, no clients have subscribed to the channel. Remove the channel from the pubsub_Channels dictionary
if (listLength(clients) == 0) {
/* Free the list and associated hash entry at all if this was * the latest client, so that it will be possible to abuse * Redis PUBSUB creating millions of channels. */dictDelete(server.pubsub_channels,channel); }... }Copy the code
The client is Offline
When the client is offline, Redis executes the freeClient function
void freeClient(client *c) {...// Unsubscribe all channels
pubsubUnsubscribeAllChannels(c,0); . }Copy the code
int pubsubUnsubscribeAllChannels(client *c, int notify) {
// Get the dictionary iterator for the channel to which the client has subscribeddictIterator *di = dictGetSafeIterator(c->pubsub_channels); .// Iterate over the dictionary
while((de = dictNext(di)) ! =NULL) {
robj *channel = dictGetKey(de);
// Unsubscribe in sequencecount += pubsubUnsubscribeChannel(c,channel,notify); }}Copy the code
If Client 1 is offline:
“Client 1” offline:
Afterword.
The Pub /sub function of Redis can only fetch channel messages in real time. When the client is offline, the channel messages are not saved, which is different from MQ service.