Wechat search “code road mark”, point attention not lost! If you think it helps, give it a thumbs up!

Along the way, we’ve seen that Redis enables manual failover through master-slave replication, monitoring and automatic failover through sentinel mode. The high performance of Redis makes sentry easy to handle in average scale applications.

As business system function, module, the increase in the size, complexity, we more and more high to the requirement of Redis, especially in the highs of the scene dynamic scalable ability, such as: electric business platform for daily flow low and stable, double 11 promote traffic is several times, and in both cases the number of requirements will be different for each system. If you always have peak-hour hardware and middleware configurations, you are bound to waste a lot of resources.

As an excellent cache product in the industry, Redis has become a necessary middleware for all kinds of systems. Sentinel mode is excellent, but because it does not have dynamic horizontal scaling capability, it cannot meet the increasingly complex application scenarios. Before the official launch of the cluster model, the industry has introduced various good practices, such as: Codis, TwemProxy, etc.

In order to make up for this defect, Redis officially launched a new operation mode – Redis Cluster since version 3.0.

Redis Cluster adopts a centrless structure, has the ability of automatic data sharding among multiple nodes, supports dynamic addition and removal of nodes, and can automatically failover when some nodes are unavailable to ensure high availability of the system. According to the official description, Redis Cluster has the following design goals:

  • High performance and scalable, supporting up to 1000 nodes. Data is fragmented among multiple nodes. Asynchronous replication is used for primary/secondary synchronization, and redirection is implemented without proxy.
  • A certain level of acceptable write security: The system will try to preserve all write operations of the client through the network partition where most primary nodes are located, and there is usually a short window in which write commands are acknowledged but lost. This window can be large if the client is connected to a network partition with a small number of nodes.
  • Availability: A Redis Cluster can work under a network partition if most nodes are reachable and the unreachable master node has at least one reachable slave node. Moreover, if A master node A has no node, but some master nodes B have multiple slave nodes (more than one), A slave node of B can be transferred to A through the slave node migration operation.

A brief overview. Combined with the above three objectives, I think the biggest feature of Redis Cluster lies in its scalability. Multiple master nodes store all data through the sharding mechanism, that is, each master/slave replication structure unit manages some keys. The master-slave replication, sentry mode has other advantages as well. When the system capacity is large enough, read requests can be allocated by adding secondary nodes, but write requests can only pass through the primary node, which has the following risks:

  • All write requests are centralized in a single Redis instance, and write delays may occur on a single master node as requests increase.
  • Each node stores the full amount of system data. If too much data is stored, the execution of RDB backup or AOF rewrite will increase the fork time, and the master/slave replication transmission and data recovery time will increase, or even fail.
  • If this primary node fails, it may result in temporary data loss or unavailability of all services during failover.

Therefore, dynamic scaling capability is the most dazzling feature of Redis Cluster. Ok, start to get down to business. This paper will introduce Redis Cluster on the whole with examples, and analyze its working principle in depth in subsequent articles.

The cluster structure

Or the continuation of the previous style, through the example of building and demonstration, to give you the establishment of the cluster structure intuitive feeling; Then comb the logic relationship based on the source code; Finally, elaborate on the process of cluster construction, step by step.

Hands-on practice

According to the official documentation, in Redis version 5 and later, cluster setup is relatively easy. This article uses six instances of Redis (version 6.2.0), three master nodes, three slave nodes, and one copy for each master node.

  • Preparing configuration FilesThe directory:cluster-demoCreate 6 folders named after the port numbers Redis will listen on: 7000, 7001, 7002, 7003, 7004, 7005. Place a minimal configuration file for the Redis Cluster in each directory named:cluster.conf, the content is as follows (note the port modification) :
port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes
Copy the code
  • Start the Redis instance: Switch to the six directories and execute the commandredis-server cluster.confTo start the Redis instance in Cluster mode. Take 7000 as an example, as shown below:

  • Create the cluster: I use Redis version 6.2.0, so I can use Redis – CLI directly. Open terminal input--cluster createCommand to create a cluster using the Redis instance we just opened, three master and three slave.
Redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001\127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 \  --cluster-replicas 1Copy the code

Through terminal, see the input as shown in the figure below:Above to>>>There are some core operations when redis-CLI creates the cluster. Of course, there are also some parts that are not output in the log. Finally, the cluster relationship will be established as shown in the following figure.The figure above describes the cluster node relationship from two perspectives: on the left is the physical structure without considering the role of the node, and the bidirectional arrows between the nodes represent the cluster bus; On the right side, node roles and master-slave groups are considered, which reflects the master-slave replication relationship and cluster bus (cluster bus is only drawn between the master nodes, you can imagine, it is too messy to draw all of them).

With the help of redis- CLI, the redis Cluster setup is relatively simple, a single command can solve all the problems. From the above process, we can clearly understand that in the process of Cluster creation, Redis – CLI is a manager, responsible for checking node status, master-slave relationship establishment, data sharding and coordination between nodes to form a Cluster by handshake, but this is not without the support of Redis Cluster capability.

In order to understand the clustering process in depth and lay the groundwork for the rest of the understanding, I will introduce Redis Cluster to some of the concepts or structures, and then explain the clustering process in detail.

Cluster data structure

In the above example Cluster, there are six Redis instances that constitute the Cluster structure of three master and three slave nodes, and the scope of hash slots is defined for each group of master and slave nodes. How does Redis Cluster describe this relationship? With that in mind, let’s go back to the data structure and see how Redis describes this relationship. According to the relationship between Redis source data structures, I draw the organizational relationship of important data structures related to Redis Cluster, as shown below (from the perspective of node A) :

clusterState

As we know, Redis Cluster is a kind of operation mode of Redis, and everything belongs to redisServer, the most core data structure in Redis. The following are only selected fields about Cluster mode.

struct redisServer {
    /* Cluster */
    // Whether to run in cluster mode
    int cluster_enabled;      /* Is cluster enabled? * /
    
    // Timeout parameter for cluster node communication
    mstime_t cluster_node_timeout; /* Cluster node timeout. */
    
    // An automatically generated configuration file (nodes.conf), which cannot be modified by users, stores cluster status
    char *cluster_configfile; /* Cluster auto-generated config file name. */
    
    // Cluster status. View the current cluster status from the perspective of the current Redis instance
    struct clusterState *cluster;  /* State of the cluster */
}
Copy the code

Therefore, in cluster mode, each redisServer describes the information and status of all nodes in the whole cluster in its view through clusterState. ClusterState contains not only the state of the current node itself, but also the state of other nodes in the cluster.

In addition, the key point is “in its view”, because the cluster is a centrless distributed system, nodes spread information through the network, and the network is not 100% reliable, there may be partition or disconnection problems, so the cluster status maintained by each node may be inaccurate or not updated in a timely manner.

The following is the complete structure of clusterState. We have a brief understanding of it and will continue to cover it in later chapters.

// This structure stores the state of the cluster from the point of view of the current node
typedef struct clusterState {
    // Current node information
    clusterNode *myself;  /* This node */
    // The configuration era of the cluster
    uint64_t currentEpoch;
    // Cluster status
    int state;            /* CLUSTER_OK, CLUSTER_FAIL, ... * /
    // Is responsible for the number of primary nodes in the hash slot
    int size;             /* Num of master nodes with at least one slot */
    // Node dictionary: name->clusterNode
    dict *nodes;          /* Hash table of name -> clusterNode structures */
    / / blacklist
    dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
    // The hash slot and destination node being migrated
    clusterNode *migrating_slots_to[CLUSTER_SLOTS];
    // The hash slot and source node that are importing
    clusterNode *importing_slots_from[CLUSTER_SLOTS];
    // The mapping between hash slots and nodes
    clusterNode *slots[CLUSTER_SLOTS];
    // The number of keys stored in each hash slot
    uint64_t slots_keys_count[CLUSTER_SLOTS];
    rax *slots_to_keys;
    /* The following fields are used to take the slave state on elections. */
    // Failover authorization time
    mstime_t failover_auth_time; /* Time of previous or next election. */
    // Failover gets votes
    int failover_auth_count;    /* Number of votes received so far. */
    // Whether to call a vote
    int failover_auth_sent;     /* True if we already asked for votes. */
    // 
    int failover_auth_rank;     /* This slave rank for current auth request. */
    // Current failover configuration era
    uint64_t failover_auth_epoch; /* Epoch of the current election. */
    int cant_failover_reason;   /* Why a slave is currently not able to failover. See the CANT_FAILOVER_* macros. */
    /* Manual failover state in common. */
    mstime_t mf_end;            /* Manual failover time limit (ms unixtime). It is zero if there is no MF in progress. */
    /* Manual failover state of master. */
    clusterNode *mf_slave;      /* Slave performing the manual failover. */
    /* Manual failover state of slave. */
    long long mf_master_offset; /* Master offset the slave needs to start MF or zero if still not received. */
    int mf_can_start;           /* If non-zero signal that the manual failover can start requesting masters vote. */
    /* The following fields are used by masters to take state on elections. */
    // The configuration era of the last poll
    uint64_t lastVoteEpoch;     /* Epoch of the last vote granted. */
    int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
    /* Messages received and sent by type. */
    long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
    long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
    // The number of nodes that reach PFAIL
    long long stats_pfail_nodes;    /* Number of nodes in PFAIL status, excluding nodes without address. */
} clusterState;
Copy the code

Here are a few fields to help you understand the basic fields of a cluster:

  • CurrentEpoch: the current era of the cluster, which is equivalent to the era of the cluster and is upgraded due to resharding, failover, etc.
  • Myself: Data type isclusterNode, stores the status of the current node, explained later;
  • Nodes: dictionary type, which is the information about all nodes in a storage cluster in the K-V structure. K is the node name (also called node ID) and the data type of VclusterNode.
  • Slots: The mapping between hash slots and nodesclusterNodeArray, indexed by hash slot number, pointing to the responsible node;

The last three fields describe the state of the node itself, record the sibling nodes in the cluster, and store the allocation of hash slots within the cluster. When a node is started for the first time, only the node itself exists, and sibling nodes and hash slot allocation information will be available only after other nodes join or join the existing cluster. These three fields are related to the clusterNode structure.

Node Properties (clusterNode)

Redis Cluster describes the information and status of a Cluster node through the data structure clusterNode. From different perspectives, it can be used to describe the state of the node itself or other nodes.

  • When Redis starts in cluster mode, it initializes oneclusterNodeObject to maintain its own state.
  • When a node discovers another node through a handshake or heartbeat process, it also creates oneclusterNodeTo record information about other nodes.

Both themselves and other nodes are stored in clusterState maintained by redisServer, the core data structure of Redis, which is constantly updated as the cluster status changes.

Some of the information maintained by clusterNode is stable or static, such as node IDS, IP addresses and ports. Others change with the cluster state, such as the range of hash slots that the node is responsible for, node state, and so on. Let’s take a look at this data structure in the form of source code + comments:

// This is the description of the cluster node, which is the basis of the cluster operation
typedef struct clusterNode {
    // Node creation time
    mstime_t ctime; /* Node object creation time. */
    // The node name, also called the node ID, is stored in node.conf after startup and will not change unless the file is deleted
    char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
    // Node status. The cluster is driven by the state machine
    int flags;      /* CLUSTER_NODE_... * /
    // The configuration era of the node
    uint64_t configEpoch; /* Last configEpoch observed for this node */
    // Represents the hash slot that the node is responsible for
    unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
    // The node is responsible for the number of hash slots
    int numslots;   /* Number of slots handled by this node */
    // If the current node is the primary node, the number of secondary nodes is stored
    int numslaves;  /* Number of slave nodes, if this is a master */
    // If the current node is the primary node, store a list of secondary nodes (array)
    struct clusterNode六四运动slaves; /* pointers to slave nodes */
    // If the current node is a slave node, the primary node of its master/slave replication is stored
    struct clusterNode *slaveof; /* pointer to the master node. Note that it may be NULL even if the node is a slave if we don't have the master node in our tables. */
    // The time when the ping request was last sent
    mstime_t ping_sent;      /* Unix time we sent latest ping */
    // The last time I received pong's reply
    mstime_t pong_received;  /* Unix time we received the pong */
    // The last time data was received
    mstime_t data_received;  /* Unix time we received any data */
    // The time when the node reaches the FAIL state
    mstime_t fail_time;      /* Unix time when FAIL flag was set */
    // The time of the last vote during failover
    mstime_t voted_time;     /* Last time we voted for a slave of this master */
    // Update time of replication offset
    mstime_t repl_offset_time;  /* Unix time we received offset for this node */
    mstime_t orphaned_time;     /* Starting time of orphaned master condition */
    // The replication offset of the node
    long long repl_offset;      /* Last known repl offset for this node. */
    // Node IP address
    char ip[NET_IP_STR_LEN];  /* Latest known IP address of this node */
    // Node port number
    int port;                   /* Latest known clients port of this node */
    // Cluster bus port number
    int cport;                  /* Latest known cluster port of this node. */
    // Network links to nodes
    clusterLink *link;          /* TCP/IP link with this node */
    // List of nodes reporting that this node is down
    list *fail_reports;         /* List of nodes signaling this as failing */
} clusterNode;
Copy the code

Let’s focus on a few key fields.

  • Node name /ID:name, each node has a unique ID, which is the only basis to identify the node.
  • Node status:flags. If you also learn the Redis source code, you will find that a lot of processes are driven by the state machine, each node in the Redis Cluster in order to describe their own or other node state, node state drive system process, although it is int type, but in fact only use the lower 10 bits, each bit corresponds to a state, Let’s take a look at the role of each digit, which will be covered later:
    • CLUSTER_NODE_NULL_NAME, 0, corresponds to a binary sequence of all zeros. When a new node joins the cluster by handshake, it has no name by default, which in turn indicates that the node does not have a unique ID.
    • CLUSTER_NODE_MASTER, 1, binary indicates that the first digit from the left is 1, indicating that the node is the primary node.
    • CLUSTER_NODE_SLAVE, 2, binary indicates that the second digit from the left is 1, indicating that the node is a slave node.
    • CLUSTER_NODE_PFAIL, 4. Binary indicates that the third bit from the left is 1, indicating that the node may break down and need to be confirmed by other nodes.
    • CLUSTER_NODE_FAIL, 8. Binary indicates that the fourth bit from the left is 1, indicating that the node is down.
    • CLUSTER_NODE_MYSELF, 16, binary indicates that the fifth bit from the left is 1, indicating that the node is storing the object itself;
    • CLUSTER_NODE_HANDSHAKEBinary indicates that the sixth bit from the left is 1, indicating that the node is in the first ping interaction in the handshake process.
    • CLUSTER_NODE_NOADDR64, binary indicates that the seventh bit from the left is 1, indicating that the network address of the node is not known.
    • CLUSTER_NODE_MEET, 128, binary means that the 8th bit from the left is 1, indicating that the MEET command is sent to this node;
    • CLUSTER_NODE_MIGRATE_TO, 256, binary indicates that the ninth bit from the left is 1, indicating that this node is suitable for replication migration.
    • CLUSTER_NODE_NOFAILOVER, 512, binary indicates that the 10th bit from the left is 1, indicating that the node will not perform failover.
  • Configuration era:configEpoch. Similar to the concept of “epoch” in Sentinel mode, hereconfigEpochRepresents the era of the node, which is the current era of the clustercurrentEpochIt might be different.
  • The node is responsible for the hash slot:slots. The hash slots of the node or its master node are stored as bitmaps.
  • Master-slave relationship: If the node is the master node,slavesThe number of copies stored; If it’s from a node,slaveofStores its primary node.
  • Cluster links:linkIs used to maintain network links between the current node and other nodes. It is theclusterStateIt is the basis of cluster bus that the isolated nodes are linked together to form a network.

Based on the understanding of clusterState and clusterNode data structures, we can basically establish the logical relationship of master-slave replication among cluster nodes at the code level. I believe you have a deeper understanding of the relationship in the structure diagram at the beginning of this section.

Cluster Bus

Cluster bus is a dedicated link within Redis Cluster for Cluster governance, which is composed of TCP links between nodes. Each node in the cluster is actively linked to all the other nodes, so each node is also connected to all the other nodes.

Will exist if the cluster has N nodesN*(N-1)In graph theory, this constitutes a directed complete graph with N vertices. Taking three nodes as an example, the relationship between nodes and the cluster bus is shown as follows:From the previous understanding of Cluster building, node attributes and Cluster status, we can know that Redis Cluster is a centrless distributed system, so it needs to continuously exchange information between nodes to achieve state consistency, and Cluster bus is the channel of information exchange between nodes. The “passage” here isclusterNodeIn thelink, its data structure isclusterLink.

/* clusterLink encapsulates everything needed to talk with a remote node. */
typedef struct clusterLink {
    mstime_t ctime;             /* Link creation time */
    // Network links with remote nodes
    connection *conn;           /* Connection to remote node */
    sds sndbuf;                 /* Packet send buffer */
    char *rcvbuf;               /* Packet reception buffer */
    size_t rcvbuf_len;          /* Used size of rcvbuf */
    size_t rcvbuf_alloc;        /* Used size of rcvbuf */
    struct clusterNode *node;   /* Node related to this link if any, or NULL */
} clusterLink;
Copy the code

ClusterLink encapsulates the remote node instance and its network connection, receiving and sending packet information, based on which two nodes can maintain real-time communication.

Note that the ports used in the cluster bus are not used for clients like 6379, but are dedicated; It is not set manually by us, but is derived from the port serving the client by offset calculation (+10000). For example, if the port serving the client is 6379, the cluster bus listens on port 16379.

Therefore, if you want to deploy the Redis instance in cluster mode, you must ensure that both ports on the host are not occupied; otherwise, the instance will fail to start.

Communication protocol

So far, we have looked at cluster nodes, cluster states, and cluster buses. They provide the foundation for the cluster to run, and the next step is to get the nodes to “move”, get them to meet each other, introduce their friends… Everything needs to communicate, and the cluster bus has provided the communication channel. Let’s look at their “language” again.

Message structure

The cluster message structure consists of message header and message body. All types of messages use the common message header. The message header contains message type fields, and different message body objects are added according to the message type. Understand the structure of the cluster message with source code and comments:

typedef struct {
    // Fixed header, magic number "RCmb"
    char sig[4];
    // Total length of message: header + body
    uint32_t totlen;
    // Message version, currently 1
    uint16_t ver;
    // The port that provides services to external clients, such as 6379
    uint16_t port;
    // Message type, such as PING, PONG, MEET, etc. The node needs to append or parse the message body based on this value
    uint16_t type;
    // 
    uint16_t count;
    // The current cluster era from the node that sent the message
    uint64_t currentEpoch;  
    // The configuration era of the sending node or its master node
    uint64_t configEpoch;   
    // Replication offset: for the master node, is the replication offset for command propagation; For slave nodes, it is the replication offset from their master node that has been processed
    uint64_t offset;    
    // The name /ID of the sending node
    char sender[CLUSTER_NAMELEN];
    // The hash slot that the sending message node is responsible for
    unsigned char myslots[CLUSTER_SLOTS/8];
    // If a slave node, this field places the node name /ID of its master node
    char slaveof[CLUSTER_NAMELEN];
    // The IP address of the sending node
    char myip[NET_IP_STR_LEN];
    char notused1[34]; 
    // Cluster bus listening port
    uint16_t cport;
    // The status of the sending node
    uint16_t flags;
    // From the perspective of the sending node, the state of the current cluster is OK or FAIL
    unsigned char state;
    unsigned char mflags[3];
    // The body of the message, according to the above message type type, determines the contents of this field
    union clusterMsgData data;
} clusterMsg;
Copy the code

The message header mainly contains the state of the message sender node so that the message receiver can parse the message and update the node information in the local cluster state. The message body is determined by the Type field in the message header. In the message structure, the message body uses the consortium type clusterMsgData.

ClusterMsgData is a union that assigns or parses corresponding fields based on type. Union is a C data structure, and you can think of this field as a Java generic, dynamically specified by the runtime.

union clusterMsgData {
    /* PING, MEET and PONG */
    struct {
        /* Array of N clusterMsgDataGossip structures */
        clusterMsgDataGossip gossip[1];
    } ping;

    /* For broadcast node failure FAIL */
    struct {
        clusterMsgDataFail about;
    } fail;

    /* PUBLISH */
    struct {
        clusterMsgDataPublish msg;
    } publish;

    /* Used to broadcast the latest status of the node hash slot UPDATE */
    struct {
        clusterMsgDataUpdate nodecfg;
    } update;

    /* MODULE */
    struct {
        clusterMsgModule msg;
    } module;
};
Copy the code

Message type

The Redis Cluster provides several different message types and then combines them to perform certain functions, such as heartbeat, handshake, configuration update, and so on. Let’s start by looking ata few important message types and their corresponding data structures. Note that data structures refer only to the data part of the overall message structure.

  • PING: used for heartbeat requests between nodes.
  • PONG: Reply to PING heartbeat requests between nodes;
  • MEET: node handshake request, which is a special PING type;

ClusterMsgDataGossip: clusterMsgDataGossip: clusterMsgDataGossip: clusterMsgDataGossip: clusterMsgDataGossip: clusterMsgDataGossip: clusterMsgDataGossip: clusterMsgDataGossip

typedef struct {
    / / the node ID
    char nodename[CLUSTER_NAMELEN];
    // Indicates the time when the sending node sent the last ping request to the node
    uint32_t ping_sent;
    // The time when the message sending node last received pong reply from the node
    uint32_t pong_received;
    IP / / node
    char ip[NET_IP_STR_LEN];  /* IP address last time it was seen */
    // External service port
    uint16_t port;              /* base port last time it was seen */
    // Cluster bus port
    uint16_t cport;             /* cluster port last time it was seen */
    // The state of the node in the view of the sending node
    uint16_t flags;             /* node->flags copy */
    // Reserved field
    uint32_t notused1;
} clusterMsgDataGossip;
Copy the code

The MEET message is only used when a node joins the cluster by shaking hands (detailed in the cluster establishment process later), and the PING PONG combination is used for heartbeat interaction between nodes (detailed in the cluster fault tolerance section).

  • FAIL: Used to tell other nodes that I (the message sender) found itnodenameThe node is faulty. If a node is found to be faulty, the source node sends a broadcast message to all other reachable nodes in the cluster using this type of command.
typedef struct {
    // The name of the faulty node
    char nodename[CLUSTER_NAMELEN];
} clusterMsgDataFail;
Copy the code
  • UPDATE: indicates that the hash slot allocation of other cluster nodes has changed. After receiving the information, nodes need to UPDATE the mapping between hash slots and nodes in the local cluster status.
typedef struct {
    uint64_t configEpoch; /* Config epoch of the specified instance. */
    char nodename[CLUSTER_NAMELEN]; /* Name of the slots owner. */
    unsigned char slots[CLUSTER_SLOTS/8]; /* Slots bitmap. */
} clusterMsgDataUpdate;
Copy the code
  • FAILOVER_AUTH_REQUEST: initiates a failover vote from a node.
  • FAILOVER_AUTH_ACK: the primary node acknowledges the vote request initiated by the secondary node.

The combination of the two is used to elect the interaction process from the node and is the basis for cluster mode failover.

Cluster Establishment Process

Simple cluster example system set up, through the console output we can generally understand the process of cluster creation; The basic concepts related to clustering, from theory to code structure, were also explained earlier. Next, summarize the whole process and explain the node handshake process in detail.

The overall process

ClusterManagerCommandCreate combination process of source code (function) and the console output, the main process of cluster creation summed up as follows:

  • According to the input parameters, Redis-CI successively creates cluster management nodes, establishes network links with each node, and obtains node and existing cluster information.
  • Check the input nodes successively, for example, whether they are existing nodes in the cluster and whether they are empty.
  • Allocate primary and secondary nodes and hash slots to determine whether the nodes meet the requirements for creating a cluster: at least three primary nodes;
  • Output hash slot sharding and master/slave node assignment, and perform node configuration with user permission:
    • For the active node: YesCLUSTER ADDSLOTSCommand to add the range of hash slots that the primary node is responsible for;
    • For slave nodes: PassCLUSTER REPLICATECommand to create a primary/secondary replication relationship.
    • For all nodes: Passcluster set-config-epochCommand to configure the epoch (config epochAdd 1);
  • Redis – through the clicluster meetCommand triggers the node handshake process between nodes over the cluster bus (Cluster Bus)MEET,PING,PONGInformation, gradually establish a cluster relationship;
  • Check the cluster node and hash slot allocation using the node on port 7000.
  • Another important point that is not shown in the figure above is that when a cluster is set up, nodes are constantly “gossiping” over the cluster bus using the binary protocol Gossip to complete node discovery, health check, failover, configuration update, and migration from nodes.

Next, we further analyze the core steps of data sharding, master-slave assignment, epoch configuration and node handshake.

Master/slave allocation and data sharding

  • Calculate the number of primary nodes.

According to the input node information and the requirements on the number of secondary nodes, redis-CLI calculates the number of primary nodes and then groups all nodes according to a primary N secondary node. Assuming that the number of input nodes is N and the number of copies of each primary node is required to be R, it can be theoretically divided into:Where, m is the calculation result on the right rounded down. In cluster mode, at least three primary nodes are required. If m is less than 3, a message is displayed indicating that the creation fails.

  • Assign primary and secondary nodes.

Redis – CLI assigns primary nodes to different hosts preferentially according to IP distribution of input nodes, selects M primary nodes, and then assigns secondary nodes according to the number of secondary nodes r. After the specified number of slave nodes r is allocated, the slave node is allocated again if there are any remaining nodes.

After the new node is started, it is the primary node by default. After the primary node is assigned to the secondary node, redis-CLI only needs to set the primary node for the secondary node and wait for the configuration to be performed.

  • Data fragmentation

When data is sharded, 16384 hash slots will be evenly divided to the master node by default.

Configuration is issued

After master/slave allocation and data fragmentation are complete, the redis-CLI has saved the master/slave configuration and data fragmentation configuration information for the node locally. After obtaining the permission of the administrator, it will traverse the node list and deliver the configuration to the corresponding node.

If the node is the primary node, data fragmentation is configured. Redis -cli uses the CLUSTER ADDSLOTS command to set the hash slots for the nodes. After receiving the message, the active node performs the following operations:

  • If clusterState-> Importing_slots is not empty, set it to NULL.
  • Modify the Myself slots field to bitmap the range of hash slots that the current node is responsible for.

If it is a slave node, the primary node is set for it. Redis -cli sends the CLUSTER REPLICATE command to a slave node. After the secondary node receives the data, the primary/secondary replication is performed.

Upgrade era

After the above configuration, each node has changed from the state when it was just started. To show this change, redis-cli updated the epoch of each node, using the command cluster set-config-epoch.

When the node receives the command, it changes Myself’s configEpoch and ensures that the currentEpoch of the cluster is not lower than this value.

Node to shake hands

At this point, each individual node has been configured, and the Redis-CLI then issues a handshake command to the nodes to add them to the cluster one by one from scratch. For security reasons, the handshake between nodes can only be initiated by the administrator and is completed through the cluster bus.

After the node is started, it listens to the cluster bus port and will accept all external network links and receive messages sent by them. However, if the source node is not found to be a known node of the cluster, all messages sent by the node will be discarded. An existing node in a cluster can accept a new node to join the cluster in either of the following ways:

  • MEET request: A new node sends a MEET request, indicating that the expansion command is initiated by the administrator and the node joins the cluster through handshake.
  • Automatic discovery: If a node is recognized as a valid node by one node in the cluster, it is notified to other nodes through heartbeat between nodes, and other nodes also consider it as a valid node in the cluster. For example, there are three nodes A, B and C in A known cluster, and A recognizes D through MEET request. After A period of heartbeat, B and C will also accept D as the node of the cluster.

Combining the above two methods, we only need to shake hands with the first node successively from the second node, and then all nodes can join the cluster through automatic discovery.

Ok, let’s look at how the handshake process is implemented. For simplicity, let’s describe the process using just two nodes. Assume the node information is as follows:

  • Node A: 127.0.0.1 7000
  • Node B: 127.0.0.1 7001

Run the redis-cli meet command to make node B shake hands with node A using cluster meet 127.0.0.1 7000.

After receiving the command, node B begins to shake hands with node A. The following figure illustrates the changes of node status during the handshake.The graph shows that during the handshake process, two nodes complete the handshake through two interactions of “meeting-pong-ping-pong”, and the state change is clear at a glance. Describe the process in words:

  • Node B creates the node information of handshake node A, which has no name when initialized and is in the status ofMEETandHANDSHAKE.
  • Node B initiatively creates A cluster bus connection with node AMEETRequest, and node A repliesPONG. At this time:
    • For B, cancel for AMEETState, get the name of A;
    • For A, B isHANDSHAKEAnd add the node to the node list.
  • Node A creates A cluster bus connection with node B and proactively initiates the connectionPINGThe request. Node B then recoversPONG. At this time:
    • For B, cancel for AHANDSHAKEstate
    • For A, cancel for BHANDSHAKEStatus, set the name.
  • At this point, nodes A and B complete the handshake, and then enter the normal heartbeat maintenance process.

Summary of Cluster Structure

This part is mainly to lay the foundation, the Cluster of some basic concepts introduced below, while introducing the physical structure and logical structure of Redis Cluster, through examples and Cluster establishment process, to give you a more intuitive understanding.

Through the content of the last section, we have known the Redis Cluster structure, design concept and create a Cluster from scratch, generally speaking, for Redis Cluster has a preliminary understanding. This section will focus on analyzing more details of Redis Cluster data sharding to help you better understand and use.

Data sharding mechanism

Data fragmentation

Different from the stand-alone Redis and Sentinel mode in which one node manages all keys, the Redis Cluster adopts a hash slot mechanism similar to the consistent hash algorithm, and multiple primary nodes share all key management work.

Redis Cluster uses CRC16 algorithm to distribute key space in 16384 hash slots. Hash slots are numbered from 0 to 16383 according to sequence number. Each group of master and slave nodes is only responsible for part of hash slot management operations. Moreover, the mapping between hash slots and nodes is maintained through the cluster state, which is updated as the cluster runs. As in our example above, the relationship between hash slots and nodes is as follows:

  • Master[0] is responsible for Slots: 0-5460
  • Master[1] is responsible for Slots: 5461-10922
  • Master[2] is responsible for Slots: 10923-16383

Whenever we perform an operation on a key through Redis Cluster, the node receiving the request will first perform a calculation on the key to get the hash slot corresponding to the key, and then find the node responsible for the hash slot from the mapping relationship between the hash slot and the node. If it is the node itself, it is directly processed. If it is another node, the client is told by redirection to connect to the correct node for processing.

HASH_SLOT = CRC16(key) mod 16384
Copy the code

Due to the existence of data sharding mechanism, different keys may be stored on different nodes, which leads to some calculation commands between multiple keys in common Redis cannot be supported. Different keys may have different hash slots. As a result, the data is stored on different nodes. If a command involves keys of multiple nodes, the performance is low. Therefore, the Redis Cluster implements all single-key commands in the normal Redis version. Complex operations that use multiple keys, such as the union and intersection operations of sets, are available only if the keys are in the same hash slot.

However, in practical application, we do store the case that a single command involves multiple keys. Based on this problem, Redis Cluster provides hash tags to meet the requirements of use to a certain extent.

The hash tag

Redis Cluster provides Hash Tags to force multiple keys to be stored in the same Hash slot. The Hash tag extracts the key used to calculate the Hash slot by matching the string between {and} in the key. For example, if the client type {abcd}test, abcd will only be used for the calculation of the hash slot. {abcd}test and {abcd}prod will be stored in the same hash slot. However, there may be multiple ‘{‘ or’} ‘in the key entered by the client. In this case, Redis Cluster will handle the following rules:

  • The key contains {characters and} characters to the right of {characters.
  • One or more characters exist between {and};

If the above two conditions are met, Redis Cluster will hash the contents between ‘{‘ and’} ‘as the real key, otherwise the original input will be used. Note: the matching of {and} follows the leftmost matching principle. Here’s an example:

  • {user1000}. Following and {user1000}. Followers: finally use user1000;

  • Foo {}{bar} : finally adopt foo{}{bar};

  • Foo {{bar}}zap: ends with {bar;

  • Foo {bar}{zap} : Finally adopt bar;

The shard

When the pressure of nodes in the cluster is too high, we will consider expanding the capacity so that new nodes share the hash slots of other nodes. When the pressure of the nodes in the cluster is unbalanced, we will consider transferring some of the hash slots from the nodes with higher pressure to the nodes with lower pressure.

Redis Cluster supports adding and removing nodes without downtime, moving out and importing hash slots between nodes, and this dynamic expansion or configuration approach is beneficial to our production practices. For example: in the electric shopping mall scene, the daily flow is relatively stable, as long as the allocation of resources on demand to ensure the safe water level can be; When there is a big rush and the traffic is large, we can add new resources to realize the horizontal expansion of service capability without stopping the machine and affecting the business. The above two situations are called Resharding or Live Reconfiguration. Let’s analyze how Redis is implemented.

Through the data structure of the cluster status, we know that the hash slot allocation is actually an array, the array index number corresponds to the hash slot, and the array value is the node responsible for the hash slot. In theory, the redistribution of hash slots is essentially to modify the corresponding node object according to the array index, and then achieve the final consistency among all nodes in the cluster through state propagation. In the figure below, change the node responsible for hash slot 1001 from 7000 to 7001.In practice, in order to implement the above process, more aspects need to be considered.

As we know, the hash slot is calculated by the key through CRC16. The hash slot is just a virtual existence for storing the key in the real node, and all operations have to return to the key. When the node responsible for the hash slot is changed from the old node to the new node, the migration of the existing key of the old node needs to be considered, that is, all the keys in the hash slot of the old node need to be transferred to the new node.

However, no matter how many keys are in the hash slot and how much data is stored in the key, migrating the key from one node to another takes time and requires atomicity. In addition, the client requests do not stop during the resharding process, and Redis needs to respond correctly to the client requests so that they are not affected.

Next, we use the sample cluster to do a re-sharding practice, and combined with the source code in-depth analysis of Redis implementation process. The following example migrates two hash slots from node 7002 to node 7000 as follows:

  • Using the commandRedis - cli - cluster reshard 127.0.0.1:7000Initiate a re-sharding request for the cluster;
  • Redis -cli output the current hash slot allocation information of the cluster, and ask the number of migrated hash slotsHow many slots do you want to move (from 1 to 16384)?, enter the number 2 and press Enter to confirm;
  • Redis -cli asks which node receives the migrated hash slot:What is the receiving node ID?Enter the ID of node 7000 and press Enter.
  • Redis – CLI ask to migrate the source of the hash slot: EnterallIndicates that the node ID is evenly divided from all other nodes and entered line by linedoneEnd means to migrate the hash slot from the input node. Here I entered the node ID 7002;
  • Redis – CLI output the resharding plan, source node, target node, migration hash slot number and other contents; The outputyesConfirm execution, enternoStop;
  • The inputyesAfter that, redis-CLI performs hash slot migration;

The following figure shows the execution process:

Above process the corresponding source for file redis – cli. C clusterManagerCommandReshard function, code is more, we focus on how the hash slot migration between nodes, so we just put up a hash slot migration analysis part of the code:

static int clusterManagerCommandReshard(int argc, char **argv) {
    /* omit code */
    int opts = CLUSTER_MANAGER_OPT_VERBOSE;    
    listRewind(table, &li);
    // Hash slot by slot
    while((ln = listNext(&li)) ! =NULL) {
        clusterManagerReshardTableItem *item = ln->value;
        char *err = NULL;
        // Migrate the hash slot from source to target
        result = clusterManagerMoveSlot(item->source, target, item->slot,
                                        opts, &err);
        /* omit code */}}/* Move slots between source and target nodes using MIGRATE.*/
static int clusterManagerMoveSlot(clusterManagerNode *source, clusterManagerNode *target, int slot, int opts,  char**err)
{
    if(! (opts & CLUSTER_MANAGER_OPT_QUIET)) {printf("Moving slot %d from %s:%d to %s:%d: ", slot, source->ip,
               source->port, target->ip, target->port);
        fflush(stdout);
    }
    if(err ! =NULL) *err = NULL;
    int pipeline = config.cluster_manager_command.pipeline,
        timeout = config.cluster_manager_command.timeout,
        print_dots = (opts & CLUSTER_MANAGER_OPT_VERBOSE),
        option_cold = (opts & CLUSTER_MANAGER_OPT_COLD),
        success = 1;
    if(! option_cold) {// Set the hash slot of the target node to importing
        success = clusterManagerSetSlot(target, source, slot, "importing", err);
        if(! success)return 0;
        // Set the source node hash slot to migrate
        success = clusterManagerSetSlot(source, target, slot, "migrating", err);
        if(! success)return 0;
    }
    // Migrate the key in the hash slot
    success = clusterManagerMigrateKeysInSlot(source, target, slot, timeout, pipeline, print_dots, err);
    if(! (opts & CLUSTER_MANAGER_OPT_QUIET))printf("\n");
    if(! success)return 0;
    /* Set the new node as the owner of the slot in all the known nodes. */
    /* Inform all nodes in turn that the node responsible for the hash slot has changed */
    if(! option_cold) { listIter li; listNode *ln; listRewind(cluster_manager.nodes, &li);while((ln = listNext(&li)) ! =NULL) {
            clusterManagerNode *n = ln->value;
            if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
            // Send the CLUSTER SETSLOT command to the node
            redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s %s", slot, "node", target->name);
            /* omit code */}}/* Update the node logical config */
    if (opts & CLUSTER_MANAGER_OPT_UPDATE) {
        source->slots[slot] = 0;
        target->slots[slot] = 1;
    }
    return 1;
}
Copy the code

clusterManagerCommandReshardThe function first finds the list of hash slots to be migrated according to the hash slot allocation and migration plan in the cluster, and then usesclusterManagerMoveSlotThe function migrates hash slots one by one. It is the core method of migrating hash slots and consists of several steps. You can combine the schematic diagram and text description to understand (the source node is at the top of each picture, and the target node is at the bottom) :The figure above shows the changing process of cluster status when hash slot 1000 is migrated from 7000 nodes to 7001 nodes. The steps are as follows:

  • Modify the migration status of source node and target node, corresponding to the first figure, where:
    • Notifies the target node to set the specified slot toimportingState;
    • Notifies the source node to set the specified hash slot tomigratingState;
  • Migrate the key from the source node slot to the target node, corresponding to the second and third figures (this step may be time-consuming, because the command processing thread of the node is occupied during the key migration)
    • Using the commandCLUSTER GETKEYSINSLOT <slog> <pipeline>Query all keys in slot from source node;
    • useMIGRATEThe program migrates keys from the source node to the target node, moving keys one by one. Each key migrates atomically, locking both nodes.
  • Tell all nodes to set the slot node to the newest node, and remove importing and migrating states from the source and target nodes, which correspond to figure 4.

Ok, so that’s the resharding process.

redirect

Data sharding causes all keys to be distributed on different nodes, and with re-sharding or failover, the mapping between hash slots and nodes will change. Then, when the client initiates an operation that can be handled, how will the cluster node and client deal with it? Let’s take a look at the two redirection mechanisms.

Version redirection

Due to the data sharding mechanism, each node in the Redis cluster is only responsible for part of the hash slots, that is, part of the storage and management of keys. The client can initiate a command request to any node in the cluster at will. At this time, the node calculates the hash slot corresponding to the current request key and queries the node responsible for the hash slot based on the mapping between the hash slot and the node. According to the query result, Redis performs the following operations:

  • If the current node is responsible for the key, the node executes the command immediately.
  • If another node is responsible for the key, the node returns one to the clientMOVEDError.

For example, first connect to port 7000 using redis-CLI in the normal way, and then execute the get TestKey command as follows:

Redis -cli -p 7000 127.0.0.1:7000> GET TestKey (error) MOVED 15013 127.0.0.1:7002Copy the code

The result tells us that TestKey corresponds to hash slot 15013, which should be handled by node 7002. The client can resolve the IP and port of the node responsible for the key based on the MOVED error information in the returned result, establish a connection with it, and then execute again. Take a test and the results are as follows:

Redis -cli -p 7002 127.0.0.1:7002> GET TestKey (nil)Copy the code

Why is that?

Because each node of Redis Cluster keeps the mapping between the hash slot and the node, when the key requested by the client is not within the responsibility of the current node, the node will not act as the proxy of the target node, but inform the client in the wrong way that the node should be responsible for the key in its opinion. Of course, if the hash slot migration happens, the node may not return accurate information, and the client may receive a MOVED or ASK error.

Therefore, this requires the client to have the ability to redirect, in time to connect to the correct node to re-initiate the command request. If commands are always redirected between the client and the node, the performance will not be as good as normal Redis mode.

What to do? Redis officially proposes two options for caching:

  • Before executing the request, the client first calculates the hash slot based on the input key. If the node corresponding to the current connection can handle the request, the hash slot and node (IP and port) mapping is saved. If the redirect occurs, connect to the new node, re-request until the execution can be successful, and finally save the relationship between the hash slot and the node. In this way, when the client can query the cache first, then execute the request, improving efficiency.
  • Through the commandCLUSTER NODESQuery cluster node status, obtain the mapping between hash slots and nodes, and cache it locally on the client. Each time a request is made, the hash slot of the key is calculated, the node is queried, and the request is executed, which is more efficient.

During the stable running of the cluster, which is most of the time, the above methods can greatly improve the efficiency of command execution. However, since re-sharding may occur during cluster operation, the information maintained by the client will become inaccurate, so when the node corresponding to the client’s hash slot changes, the client should correct it in time.

Redis-cli has MOVED redirection capabilities since version 5.0. Connect to node 7000 as a cluster client and run the preceding command. The effect picture is as follows:

Redis -cli -c -p 7000 127.0.0.1:7000> GET TestKey-> Redirected to slot [15013] located at 127.0.0.1:7002(nil) 127.0.0.1:7002 >Copy the code

A request is made to node 7000, but the client automatically connects to 7002 and re-executes the request after receiving the result from 7000.

If the client requests the key (CRC16=1000) command from the node during the re-sharding process, does it matter? With that in mind, let’s take a look at ASK redirection.

ASK a redirect

In the process of resharding, some keys stored in the hash slot are in the source node or have been migrated to the target node. At this point, the client makes a command request to the source node (especially in the case of multiple keys), and the MOVED redirection will not work properly. The following figure shows the status diagram of the cluster. Let’s analyze it:

In order to fully explain the ASK redirection process, the commands issued to nodes described in this part will contain multiple keys with the same hash slot, such as {test}1, {test}2, represented by plural keys, and assume that the hash slot corresponding to test is 1000.

As mentioned above, when a client requests for keys from a node, it calculates the hash slot for keys, and then uses the mapping between the hash slot and the node to find the node that is responsible for the hash slot. Finally, it will execute immediately or return a MOVED error.

However, if the cluster is in the process of re-sharding, the keys requested by the client may not have been migrated or may have been migrated. Let’s see what happens.

  • No migration: The client directly or MOVED redirects requests to the 7000 node. After checking, the 7000 node needs to process requests by itself, and keys are stored in the 7000 node.
  • MOVED: The client directly or through MOVED redirects requests to node 7000. After checking, node 7000 needs to process requests by itself, but keys have been completely or partially migrated to node 7001. Therefore, the client cannot find keys and process requests properly.

Therefore, the MOVED redirect is not appropriate in this case. To this end, Redis Cluster introduces ASK redirection. Let’s take a look at how ASK redirection works.

The client sends a keys request to node 7000 based on the mapping between the local cache hash slot and nodes. Based on the keys migration progress, node 7000 performs the keys request as follows:

  • The key hash slot is used by node 7000 if:
    • When slot 1000 is not in the process of migrating, the current request is performed by node 7000 and the result is returned.
    • In a migrating process, hash slot 1000 cannot be migrated, but keys corresponding to keys are not migrated. If the current request can be performed on node 7000, node 7000 processes the request and returns the result.
    • When a key corresponding to the keys has been fully or partially migrated to 7001, the client is told of the node to be requested using an ASK redirection error, in the following format:
(error) -ASK <slot> <ip>:<port>
#The corresponding result is as follows:(error) - ASK 127.0.0.1 1000:7001Copy the code
  • When the client receives the ASK redirect error message, it sets the hash slot (1000) with a one-time identity that forces it to point to the new node (7001), and then does the following:
    • Send the ASKING command to node 7001 and remove the one-time identifier.
    • Then send the command that the 7001 node really needs to request;
  • 7001 After receiving the ASKING request from the client, the node performs the following operations:
    • In the process of importing hash slot 1000, it is importing the keys of the current requested keys, then the 7001 node executes the request and returns the result.
    • Hash slot 1000 is importing, and it is importing the current requested keys that are not completely imported, so a TRYAGAIN error is returned.

This way, if the keys requested by the client are in the process of migration, the node will return to the client with an ASK redirect error, and the client will initiate a request to the new node. Of course, there is a certain probability that the request will fail because keys are not migrated, at which point the node will reply with “TRYAGAIN” and the client can TRYAGAIN later.

Once the move is complete, the client will receive a MOVED redirection error from the node, indicating that the management of the hash slot has been transferred to the new node. In this case, the client can change the mapping between the local hash slot and the node and send requests to the new node using the MOVED redirection logic.

MOVED and ASK redirection

Through the introduction of the previous part, I believe that we have a certain understanding of the difference between the two, a brief summary.

  • Both tell clients the wrong way to make target requests to other nodes;
  • MOVED Redirection: Tells the client which node is responsible for the current hash slot, based on the mapping between the hash slot and the node. If the client receives this error, it can update the local hash slot and node mapping cache directly. This is a relatively stable state.
  • ASK redirection: Tells the client that the hash slot corresponding to the keys it requested is currently being migrated to a new node and that the current node is unable to complete the request and should initiate a request to the new node. The client receives this error and will temporarily (one-time) redirect to ask for a request from the new node. This error does not affect subsequent requests from the client for the same hash slot unless it receives an ASK redirection error again.

Can the cluster provide services during capacity expansion or reduction?

This is a common interview question, and it’s not difficult to answer if you understand the nature of the question. Let’s break it down:

  • Cluster expansion: If a master node is added: After the master node is added, it is not responsible for any hash slots at first. In order to distribute the system stress, we need to reshard, moving some of the hash slots to the new nodes, so it’s essentially a reshard process. If you add a slave node, you only need to perform a master/slave replication with the specified master node.
  • Cluster scaling: Scaling means removing nodes from the cluster. If the master node is removed, normally the master node is responsible for the read and write of some hash slots. To remove the master node safely, the node responsible for the hash slot needs to be transferred to other nodes first, which is also a re-sharding process. If you remove the slave node, simply remove it.

The expansion or reduction of master nodes is essentially a process of re-sharding, which involves the migration of hash slots, that is, the migration of keys in hash slots. Redis Cluster provides ASK redirection to tell clients what is happening in the Cluster so that the client can adjust: ASKING for redirection or retry.

Therefore, on the whole, the cluster can normally provide services during capacity expansion or reduction.

Data sharding summary

Data sharding is the foundation of Redis Cluster dynamic shrinkage and scalability. Although the content of this paper is quite wordy, the principle is relatively simple. If you focus on understanding the process and nature of expansion, you can respond to all changes with no change.

Cluster fault tolerance mechanism

The automatic fail-over capability of Sentry mode provides high availability assurance, as does the automatic fail-over capability of Redis Cluster for Cluster availability. There are a lot of similarities between them in design ideas. This section will analyze this topic.

heartbeat

As a distributed system without center, the fault tolerance mechanism of Redis Cluster relies on the cooperation of all nodes. When a node fault is detected, it spreads the node fault and reaches a consensus, and then triggers a series of node election and failover. This is accomplished on the basis of the maintenance of cluster state between nodes through the heartbeat mechanism.

The diagram below shows the state of the cluster from the perspective of node A (it only plots the contents related to cluster fault tolerance). “Myself” refers to node A itself, which describes node A’s state. Nodes [B] points to node B, which views the status of node B from node A. There are also the current era of the cluster, hash slot and node mapping relationship.In a cluster, it passes between every two nodesPINGandPONGBoth types of messages maintain a heartbeat. From the previous section, you can see that the two types of messages have exactly the same structure (the header and the body are the same)typeThe field is different and we call this message pair a heartbeat message.

  • PING/PONGThe message header contains the configuration epoch (configEpoch), replication offset (offset), node name (sender), responsible hash slot (myslots), node state (Flags) of the message source node, which corresponds to the node information maintained by the target node. It also includes currentEpoch and state in the view of the source node.
  • PING/PONGThe message body contains severalclusterMsgDataGossip, eachclusterMsgDataGossipCorresponding to a cluster node state, it describes the heartbeat state between the source node and it and the source node’s judgment on its running state.

Heartbeat messages are transmitted and exchanged between two nodes of the cluster in the form of “plague spread”, which ensures that the node status can be reached in a short period of time. We have a deep understanding of heartbeat mechanism from the aspects of timing of heartbeat trigger, composition of message body and application.

trigger

In cluster mode, the heartbeat action is triggered by the periodic function clusterCron(), which executes every 100 milliseconds. In order to control the size of messages in the Cluster and take into account the timeliness of heartbeat between nodes, Redis Cluster adopts different processing strategies.

Normally, clusterCron() sends PING messages to a node every second (this function is executed every 10 times). The selection of this node is random, and the random method is:

  • Select a node randomly from the node list five times each time. If the node meets the conditions (cluster bus link exists, not waiting), the cluster bus link is not waitingPONGReply, non-handshake state, non-local node), as an alternative node;
  • If the alternate node is not empty, it is selected from the alternate nodesPONGRestores the earliest node;

Note The basis for checking heartbeat messages sent and received by Redis Cluster, which is also very important for subsequent fault detection:

  • When the source node sends a message to the destination nodePINGCommand to set the target nodeping_sentIs the current time.
  • When the source node receives the packet from the destination nodePONGAfter the reply, the target node will be setping_sentIs 0 and updated at the same timepong_receivedIs the current time.

That is, ping_sent> is 0, indicating that a > PONG> reply has been received and waiting for the next send; > ping_sent> Not 0, waiting for > PONG> reply.

The timeout parameter cluster-node-timeout is set in the cluster configuration file, and the corresponding variable NODE_TIMEOUT is used as the basis for the heartbeat timeout of the target node. To ensure that pingpong messages do not time out and to preserve retries, Redis Cluster will perform heartbeat compensation at the NODE_TIMEOUT/2 threshold.

ClusterCron () checks each node in turn each time it is executed (100 ms) :

  • If the destination node is receivedPONGThe messageNODE_TIMEOUT/2Has not been sentPING, the source node sends a message to the destination node immediatelyPING.
  • If already sent to the destination nodePINGMessage, but inNODE_TIMEOUT/2The source node will try to disconnect the network connection and eliminate the impact of the network connection fault on the heartbeat by reconnecting.

In general, the number of heartbeat messages sent and received per second in the cluster is stable, and the instantaneous NETWORK I/O will not be too large to burden the cluster even if the cluster has many nodes. There are N*(n-1) links in the whole cluster according to the directed complete graph of N nodes. Each link needs to maintain the heartbeat, and heartbeat messages appear in pairs.

If the cluster has 100 nodes and NODE_TIMEOUT is 60 seconds, that means that each node sends 99 pings in 30 seconds, or an average of 3.3 pings per second. With 100 nodes sending a total of 330 messages per second, this order of magnitude of network traffic pressure is acceptable.

However, when the number of nodes is fixed, the NODE_TIMEOUT parameter needs to be set properly. If the heartbeat message is too small, the heartbeat message exerts great pressure on the network. If the number is too large, node faults may be detected in a timely manner.

The message form

PING/PONG messages use a consistent data structure. The content of the header comes from myself in the cluster state, which is easy to understand; The message body needs to append the state of several nodes, but there are many nodes in the cluster. Which nodes should be added?

According to the design of Redis Cluster, each message body will contain the normal node and the PFAIL status node, the specific obtain method is as follows (this part of the source is located in the clusterSendPing function.

  • Determine that the heartbeat message needs to contain the theoretical maximum value of healthy nodes (this is theoretical because the following process also needs to consider the node state and remove nodes that are in the grip or down) :
    • Condition 1: The maximum number is equal to the number of cluster nodes -2. “Minus 2” refers to removing the source node and target node.
    • Condition 2: The maximum number is 1/10 of the number of nodes and not less than 3.
    • Take the minimum value of the above two results to obtain the required theoretical quantitywanted.
  • Ensure that the heartbeat information needs to be includedPFAILNumber of status nodespfail_wanted: Gets all information in the cluster statusPFAILNumber of nodes (server.cluster->stats_pfail_nodes).
  • Add a normal node: The node is randomly added from the cluster node listwantedCreate the gossip message fragment and add the message body. Nodes must meet the following conditions:
    • Not the source node itself;
    • notPFAILState, as added separately laterPFAILNode;
    • The node is not in the handshake state or no address state, or the node cluster bus link exists and the number of responsible hash slots is not 0;
  • addPFAILStatus node: traversal to obtainPFAILStatus node, create the gossip fragment and add the message bodyPFAILState, not handshake state, not no address state.

Message application

After receiving a PING or PONG message, the node updates the node status maintained locally according to the content in the message header and message body. Following the source code description, which involves fields and logic is still more complex, I will illustrate the message processing from the perspective of application scenarios (combined with the source code function clusterProcessPacket).

Cluster chronicle and configuration epoch

The heartbeat header contains the configuration epoch of the source node and what it considers the currentEpoch of the cluster. Upon receipt, the destination node checks the configuration epoch of the source node that it maintains and the currentEpoch of the cluster. Specific methods are as follows:

  • If the source node configuration era (cached configuration era) cached by the destination node is smaller than the configured era (declared configuration era) declared in the heartbeat message of the source node, change the cached configuration era to declared configuration era.
  • If the cluster current era (cached current era) cached by the destination node is smaller than the cluster current era (declared current era) declared in the heartbeat message of the source node, change the cached current era to declared current era.

Hash slot change detection

The message header contains a list of hash slots that the source node is currently responsible for, and the destination node checks the mapping between the locally cached hash slots and the node to see if there are hash slots that are inconsistent with the mapping. When an inconsistent mapping relationship is found, it is handled as follows:

  • If the source node is the primary node and its declared epoch is greater than that cached by the destination node, the mapping between the local hash slot and the node is modified according to the declared hash slot.
  • If the local cached node configuration era is greater than the era declared by the source node, passUPDATEThe message tells the source node to update the local hash slot configuration.
  • If the locally cached node configuration era is the same as the one declared by the source node, and both the source node and the target node are primary nodes, then the configuration era conflict is handled: the target node updates the current era of the local cluster and its own configuration era (consistent with the current era of the cluster). In subsequent heartbeats, the hash slot conflicts will be adjusted again to achieve consistency.

New Node Discovery

In the process of node handshake, we know that a new node only needs to shake hands with any node in the cluster to join the cluster, but other nodes do not know that the new node joins the cluster, and the new node does not know the existence of other nodes. In both cases, it is the discovery of new nodes.

During the heartbeat process, the source node adds the new node information unknown to the other node to the message body to notify the destination node. The target node will perform the following process:

  • The destination node finds a node in the message body that does not exist in the local cache and creates it for itclusterNodeObject and add it to the cluster nodes list.
  • The target node isclusterCronFunction to create its network link, the two through two heartbeat interaction to complete the discovery of new nodes.

Node Fault Discovery

Node fault detection is a core function of heartbeat, which is described separately in the next section.

Fault discovery and transfer

PFAIL and FAIL concepts

Redis Cluster uses two concepts PFAIL and FAIL to describe the running status of nodes, which is similar to SDOWN and ODOWN in Sentinel mode.

  • PFAIL: It may be down

When a node is unable to access a node after NODE_TIMEOUT, it identifies the unreachable node with PFAIL. Regardless of the node type, both primary and secondary nodes can identify other nodes as PFAIL.

The unreachability of nodes in Redis cluster means that after the source node sends the PING command to the target node and fails to get its PONG reply after NODE_TIMEOUT, the target node is considered unreachable.

This is a concept derived from the heartbeat. To keep PFAIL as close as possible, NODE_TIMEOUT must be larger than the network round-trip time between the two nodes; To ensure reliability, when no reply from the target node to the PONG command is received after half of the NODE_TIMEOUT, the source node immediately tries to reconnect to the target node.

Therefore, PFAIL is the result of heartbeat detection from source node to target node, which is subjective to a certain extent.

  • FAIL: downtime

PFAIL status is subjective. It does not mean that the target node is down. Only when the target node is in FAIL state, it means that the node is down.

However, we already know that during the heartbeat, each node notifies other nodes of the detection of PFAIL. Therefore, if one node is down objectively, other nodes must also detect the PFAIL state.

When a certain number of nodes in the cluster consider the target node to be in the PFAIL state within a certain window of time, the node raises the status of the target node to FAIL.

Node fault detection

This section describes the principle of node fault detection in detail. For example, nodes A, B, and C are the active nodes. Node B is down as an exampleping_sent,pong_received,fail_reportsSeveral fields. How does a node reach the PFAIL state?

The cluster status maintained by each node contains the node list. Node information is shown in node B in the figure above. The ping_sent field indicates the heartbeat status of node B to node A. If the value is 0, the heartbeat between node A and node B is normal. If the value is not 0, node A has sent PING to node B and is waiting for node B to reply to PONG.

The cluster node executes the clusterCron() function every 100 milliseconds, which checks the heartbeat and data interaction status of each node. If node A does not receive any data from node B within the NODE_TIMEOUT period, node B is regarded as faulty, and node A sets its status to PFAIL. The specific code is as follows:

void clusterCron(void) {
    / * omitted... * /
    
    // The time when the ping message has been sent
    mstime_t ping_delay = now - node->ping_sent;
    // How long has it been since we received data from the node
    mstime_t data_delay = now - node->data_received;
    
    // Take the earlier of both
    mstime_t node_delay = (ping_delay < data_delay) ? ping_delay : data_delay;

    // Determine timeout
    if (node_delay > server.cluster_node_timeout) {
        /* The node times out. If the current node is not in the PFAIL or FAIL state, set it to PFAIL */
        if(! (node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) { serverLog(LL_DEBUG,"*** NODE %.40s possibly failing", node->name);
            node->flags |= CLUSTER_NODE_PFAIL;
            update_state = 1; }}/ * omitted... * /
}
Copy the code

The PFAIL state is propagated

According to Heartbeat Mechanism – Message Composition, nodes in the PFAIL state will be propagated to all reachable nodes in the cluster with heartbeat.

The PFAIL state changes to the FAIL state

The PFAIL to FAIL status switch requires the approval of more than half of the primary nodes in the cluster. The cluster nodes collect the PFAIL flag of the node through heartbeat messages. If node B fails, both A and C will detect the failure of node B and mark node B as PFAIL. The heartbeat messages between nodes A and C contain the PFAIL status of node B. From the perspective of node A, how to deal with Redis Cluster.

Due to the state of the other nodes in the heartbeat message message, the message receiver by clusterProcessGossipSection function for processing, node C is the master node, node B for PFAIL state and statement. From the source code, the following process is executed:

  • Add a fault reporting node to node B, that is, add node C to node Bfail_reportsInside.

Fail_reports indicates the clusterNodeFailReport list, which stores the list of all nodes that consider node B to be faulty. The structure is as follows, where the time field represents the time when the node is added, that is, the latest time when the node is declared faulty. When the node status is reported again, only the time field is refreshed.

typedef struct clusterNodeFailReport {
    /* Node reporting node failure */
    struct clusterNode *node;  /* Node reporting the failure condition. */
    /* Time when the fault is reported */
    mstime_t time;             /* Time of the last report from this node. */
} clusterNodeFailReport;
Copy the code
  • Check whether theFAILConditions: Redis Cluster specifies if more than halfThe master nodeConsiders a node to bePFAILStatus: Sets the node status toFAILState. Following the above example, the specific process is as follows:
    • calculationFAILLegal number of nodes: in this case, if the cluster contains three primary nodes, at least two nodes are required to approve it.
    • In the cluster status of node A, node C has been added to node Bfail_reportsList, and node A has marked node B as faulty. That is, two nodes confirm that node B is faulty. Therefore, set node B toFAILState.
    • Node A cancels node BPFAILState, set itFAILState, and then sends a message about node B to all reachable nodesFAILThe message.

The detailed code procedure is shown in the following function:

void markNodeAsFailingIfNeeded(clusterNode *node) {
    int failures;
    // Calculate the legal number of node outages
    int needed_quorum = (server.cluster->size / 2) + 1;
    // Determine whether the current node considers the node to have timed out
    if(! nodeTimedOut(node))return; /* We can reach it. */
    if (nodeFailed(node)) return; /* Already FAILing. */

    failures = clusterNodeFailureReportsCount(node);
    /* The current node also recognizes the node is down */
    if (nodeIsMaster(myself)) failures++;
    if (failures < needed_quorum) return; /* No weak agreement from masters. */

    serverLog(LL_NOTICE, "Marking node %.40s as failing (quorum reached).", node->name);

    /* Set the node to the FAIL state */
    node->flags &= ~CLUSTER_NODE_PFAIL;
    node->flags |= CLUSTER_NODE_FAIL;
    node->fail_time = mstime();

    /* Broadcast the FAIL status of the node to all reachable nodes, and all nodes will be forced to accept the acceptance after receiving it */
    clusterSendFail(node->name);
    clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}
Copy the code

Note the following: Records in fail_reports have a validity period. The default value is two times NODE_TIMEOUT. Records that exceed the validity period are removed. In other words, sufficient records must be collected within a certain time window to complete the PFAIL to FAIL state transition. If the heartbeat between a primary node and the node recovers, it is removed from fail_Reports immediately.

After node A sets node B to the FAIL state, node A sends A message about node B’s FAIL to all reachable nodes. The message type is CLUSTERMSG_TYPE_FAIL. As soon as other nodes receive a FAIL message, they immediately set node B to the FAIL state, regardless of whether node B is in the PFAIL state in their opinion.

When a primary node fails, a FAIL message about it is propagated to all reachable nodes in the cluster, which mark it as a FAIL state. In order to ensure the availability of the Cluster, the slave nodes of the master node will start the failover action and select the optimal slave node to promote the master node. Failover of the Redis Cluster consists of two key processes: slave node election and slave node promotion.

Slave node election

If the primary node fails, all the secondary nodes of the primary node will start an election process, and only the secondary nodes that win the vote will have the chance to be promoted to the primary node under the voting of the other primary nodes. The preparation and execution of the slave node election takes place in clusterCron.

Conditions and timing for calling an election

From node election process must meet the following condition (before the election process by checking work in function clusterHandleSlaveFailover (void)) :

  • The master node of the slave node is inFAILState;
  • The hash slot responsible by the master node of the secondary node is not empty.
  • To ensure the timeliness of data on the slave node, the disconnection time between the slave node and the master node must be shorter than the specified time. For this specified time, I extracted it from the code as follows:
/* Data validity time */
mstime_t data_age;

/* Takes the interval between the secondary node and the primary node */
if (server.repl_state == REPL_STATE_CONNECTED) {
    data_age = (mstime_t)(server.unixtime - server.master->lastinteraction) * 1000;
} else {
    data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
}

/ * * /
if (data_age > server.cluster_node_timeout)
    data_age -= server.cluster_node_timeout;

data_age > 
    (((mstime_t)server.repl_ping_slave_period * 1000) 
     + (server.cluster_node_timeout * server.cluster_slave_validity_factor)
Copy the code

If a primary node in the FAIL state has multiple secondary nodes, the Redis Cluster always expects the secondary node with the most complete data to be promoted to the new primary node. However, if all slave nodes start the election process at the same time and all slave nodes compete fairly, the node with the most complete data cannot be guaranteed to be promoted preferentially. To increase the priority of this node, Redis Cluster introduces a delayed start mechanism when starting the election process. Combined with the source code, each slave node will calculate a delay value and calculate the start time of the election process of the node according to the calculation formula as follows:

/* election process start DELAY */ DELAY = 500 + random(0,500) + SLAVE_RANK * 1000 /* calculate the election process start time from the node */ server.cluster->failover_auth_time = DELAY + mstime()Copy the code

To explain this formula:

  • The fixed delay value of 500 is to ensure that the FAIL status of the primary node is propagated in the cluster and prevent the primary node from rejecting the primary node when the election is initiated.
  • Random delay value random(0,500) : to ensure that the election process is not started at the same time from the slave node;
  • SLAVE_RANK: This value depends on the completeness of data from the slave node. When the primary node becomesFAILAfter state, from node to node will passPONGCommand to swap states in order to establish an optimal rank ranking; Rank from nodes The ranking rules are as follows: Rank from nodesrepl_offsetRank = 0; And then rank = 1, and so on.

So, the node with the highest data integrity will start the election process first, and if all goes well, it will be promoted to the new master node.

After failover_AUTH_Time is set, when clusterCron() runs again, if the system time reaches the default value (and failover_AUTH_SENT =0), the election process will start.

The process of electing from nodes

To start the election process from the node, add 1 to currentEpoch and set failover_AUTH_SENT =1 to indicate that the election process has started. Then, an election request is sent to all primary nodes in the cluster through FAILOVER_AUTH_REQUEST messages, and the response of the primary node is waited for within 2 times NODE_TIMEOUT (at least 2 seconds).

The other primary nodes in the cluster are the decision makers elected from the nodes and need to make strict checks before voting. To prevent multiple secondary nodes from winning the election and ensure the legitimacy of the election process, the primary node verifies the following conditions after receiving the FAILOVER_AUTH_REQUEST command message:

  • The primary node of the secondary node that initiates the election process must be inFAILStatus (the fixed value of 500 ms in the previous DELAY is to ensure that FAIL messages are propagated adequately within the cluster);
  • For a givencurrentEpoch, the primary node will only vote once and be saved in lastVoteEpoch, other older epochs will be rejected. Also, if the currentEpoch in the slave node vote request is smaller than the currentEpoch in the current primary nodecurrentEpoch, the request to vote will be rejected.
  • Once the master node passesFAILOVER_AUTH_ACKType of message voted to the specified secondary node, the primary node in 2 timesNODE_TIMEOUTTime will not vote for other secondary nodes under the same primary node.

After voting, the primary node will record the information and save it to the configuration file in a secure and persistent manner:

  • Save the current era of the cluster from the last vote:lastVoteEpoch
  • Save the voting time, stored in the cluster node listvoted_timeIn the.

To avoid counting the previous vote as the current vote, the slave node checks the currentEpoch declared by the FAILOVER_AUTH_ACK message. If the value is less than the current era of the slave node’s cluster, the vote is discarded. If the vote is valid, secondary nodes will run cluster->failover_auth_count.

After being approved by a majority of the master nodes, the slave node wins the election. If the majority of nodes do not vote for it within 2 times NODE_TIMEOUT (at least 2 seconds), the election process will terminate and a new election process will start after 4 times NODE_TIMEOUT (at least 4 seconds).

Ascending from node

After get valid votes from node, will add 1 vote counter failover_auth_count, and through the node from the election and ascend handler clusterHandleSlaveFailover periodically check, if get most from the node number (legal) master node recognition, The promotion process from the node is triggered.

In this case, the number of failed nodes is the same as the number of failed nodes, that is, (server.cluster->size / 2) + 1, more than half of the primary nodes.

Start the promotion process from the node, it will make a series of modifications to its own state information, and finally promote itself as the main node, the details are as follows:

  • Configures the epoch from the nodeconfigEpochAdd 1;
  • Switch your role to the master node and reset the master/slave replication relationship (this is similar to the sentry mode promoted from node);
  • Copy the original master node responsible for the hash slot, change to their own responsibility;
  • Save and persist the above configuration changes.

After the slave node sets itself as the master node, it broadcasts the change of its state to all nodes in the cluster through the PONG command, so that other nodes can modify the state in time and accept the role promotion of the slave node.

After a node wins an election, its role promotion process is relatively simple, and more importantly, it is recognized by all other nodes in the cluster. Considering the roles of other nodes in the cluster, three scenarios need to be considered:

  • Other nodes: Before the secondary node is promoted, other nodes in the cluster have no master-slave relationship with the current node and are not the same primary node as the secondary node.
  • Sibling slave: A slave of the same master node before the slave node is promoted.
  • Old primary node: How to join the cluster again after a fault recovery.

General processing logic

When the new master node is promoted, PONG messages are sent to all reachable nodes in the cluster. When other nodes receive the PONG message, in addition to the general processing logic (such as the promotion of configuration era, etc.), they will detect the role change of the node (from the node to the master node), so as to update the cluterState of the local cluster. The specific updates are as follows:

  • [Fixed] Update the configuration chronicle and the cluster current era because it was upgraded when promoted from the node
  • Remove the slave node from the node list.
  • Update the node role: set the slave node as the master node, cancel the slave node flag;
  • Hash slot conflict processing: the new master node takes over all the hash slots of the old master node, and changes the hash slots of the original master node to the new master node;

The secondary sibling node switches over the primary/secondary replication

The above process is common to “other nodes” and “sibling slave nodes”. The “old master node” is temporarily disconnected and cannot be notified. Based on this PONG message, “Other Nodes” have accepted the role change information of the new master node. However, the “sibling node” still takes the old master node as its own master node. According to the idea of failover, it should take the new master node as its own master and slave replication object. How to achieve this?

In the process of hash slot conflict processing, the “sibling slave node” will find that the conflicting hash slot is the responsibility of its original master node. When the “sibling slave node” detects this change, it will take the new master node as its own master node and use it as the new master node for master slave replication.

The original primary node is rejoined

When the old master node recovers, it maintains heartbeat with other nodes using the same configuration information as before the outage (cluster current era, configuration era, hash slot, and so on).

When a node in the cluster receives its PING message and finds that its configuration information is out of date (node configuration era) and the allocation of hash slots conflicts, the node will notify it to UPDATE its configuration through an UPDATE message.

The UPDATE message contains information about the nodes that are responsible for the conflicting hash slots. After receiving the UPDATE message, the old master node finds that its node configuration era is out of date. Therefore, the old master node regards the UPDATE message node as its master node, changes its identity to slave node, and updates the local hash slot mapping.

In subsequent heartbeats, other nodes will update the old master node as the slave node of the new master node.

At this point, failover is complete.

Other topics related to fault tolerance

Migrating from a Node

In order to provide the availability of the Cluster system, Redis Cluster achieved from the node transfer mechanism: the building of a Cluster, each master node has a number of slave nodes, if in the process of running for several independent node failure event, leading to a master node is not normal (isolated) from the node, then once the master node goes down, the Cluster will not be able to work. The Redis Cluster will find the isolated primary node and the primary node with the largest number of slave nodes in time, and then select the appropriate slave node to migrate to the isolated primary node, so that it can withstand another outage event, thus improving the availability of the entire system.

The following figure shows an example: In the initial state, the cluster has seven nodes, of which A, B, and C are the master nodes. A has two slave nodes A1 and A2, and B and C have one slave node each, respectively, B1 and C1.The function of migration from a node when a node fails is described as follows:

  • In the process of cluster operation, due to the failure of node B, B1 is promoted to the new master node through election, resulting in the isolation of B1 without nodes. If B1 fails again, the cluster will be unavailable.
  • However, A has two slave nodes at this time, and Redis Cluster will start the slave node migration mechanism to transfer A1 to the slave node of B1, so that B1 is no longer isolated.
  • Even if B1 fails again, A1 can be promoted to the new master node and the cluster can continue to work.

Cluster fissure

As a distributed system, it is necessary to solve all kinds of complex problems caused by network partition. In Redis Cluster, due to network partition problems, Cluster nodes are distributed in two partitions, resulting in Cluster “brain split”. How does the secondary node election and promotion work in both network partitions at this time?As shown in the figure above, node A and its slave node A1 are disconnected from other nodes due to the network partition. Let’s see how the nodes in the two partitions work.

  • Most nodes are partitioned

Nodes in this zone will detect the PFAIL state of node A, and then confirm that node A reaches the FAIL state through propagation. The A2 node triggers the election process and wins, and is promoted to the new primary node to continue working. After a failover, the network partition that contains most of the nodes can continue to work.

  • Few node partitions

Nodes A and A1 located in A few node zones will detect the PFAIL status of other nodes B and C. However, as they cannot be confirmed by most of the primary nodes, B and C cannot reach the FAIL state, resulting in failure of subsequent failover.

Redis Cluster summary

This paper introduces the working principle of Redis Cluster from three main parts: Cluster structure, data sharding, fault tolerance mechanism, almost covering all the content of Redis Cluster, hoping to bring help to everyone to learn Redis Cluster.

In the study of official documents, system source code process, did encounter a lot of puzzled content, through repeated comb code process, one by one to uncover the answers to each mystery, and finally established the whole knowledge system.

Here I am at last. This article took one month from the beginning to the completion. I wrote a little bit every day.

The resources

  • Redis Cluster Specification: Redis. IO /topics/clus…
  • Tutorial on Redis Cluster: Redis. IO /topics/clus…
  • Redis source code 6.0.10