RocketMQ consumer load policy

The Consumer needs to load the TopicMessage before pulling the message. The load operation is performed by a timer with a default interval of 20 seconds

To put it simply, the MessageQueue under Topic is allocated to these consumers. As for how to divide, it is divided according to the algorithm rules defined by these load policies.

AllocateMessageQueueAveragely

The load averaging strategy, which RocketMQ uses by default, is that if a Consumer cluster subscribles to a Topic, the MessageQueue below the Topic will be equally distributed among the consumers in the cluster, and to help you understand that, I’ve drawn a diagram

If the topic is testMsg, there are four MessageQueue below the testMsg, and these consumers form a cluster (they all listen to the topic and the consumption groupId is the same). Firstly, the Consumer and MessageQueue will be sorted. Who is the boss and who takes MessageQueue first. The average distribution is divided into two situations.

The number of MessageQueue is greater than that of Consumer

If the number of queues is not an integer multiple of the number of consumers, as in the case of 2 consumers and 4 consumers in the figure, the number of each Consumer should be divided first. Take 2 consumers as an example, C1 and C2 each get 2 MessageQueue. C1 comes before C2, so C1 takes away Q0 and Q1, and C2 takes two, which is Q2 and Q3.

If the number of queues is not an integer multiple of the number of consumers, as in the case of 3 consumers and 5 consumers, 5 comsumers is a special case, and we’ll talk about that later. Let’s take the case of 3 consumers, 4 message queues, each Consumer gets one, there’s one left, The law of the jungle, of course, the rest of the big brother C1, finally divided, C1 is divided into two queues, C2 and C3 only one, after the number of divided, also in order, C1 gets Q0 and Q1, C2 can only start from Q2, C3 can only take the rest of Q3

The number of MessageQueue is less than that of Consumer

On average, there is less than one Consumer per person, which is the same as the five comsumers in the figure above. As usual, each person takes one queue first, and since C5 is at the back of the queue, the queue is taken by someone else, C5 never gets to be in the message queue. Unless one of the previous consumers dies, you get the MessageQueue 20 seconds later when the queue is reloaded.

Specific algorithm:

// The sort of consumer
int index = cidAll.indexOf(currentCID);
/ / modulus
int mod = mqAll.size() % cidAll.size();
// If the number of queues is smaller than the number of consumers, set the number of queues to 1, if the remainder is greater than the index of the current consumer, then
// The number of queues that can be assigned +1, otherwise it is the average
int averageSize =
  mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                                       + 1 : mqAll.size() / cidAll.size());
// Consumer gets the index of the first MessageQueue
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
// If the number of consumers is greater than the number of queues, rang will be negative and the loop will not execute
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
  result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
Copy the code

AllocateMessageQueueAveragelyByCircle

The only difference between circular average allocation and average allocation is that the average Queue takes all the MessageQueue belonging to it, while the circular average means that each person takes one MessageQueue and the Queue is not continuous. And I drew a picture to help you understand it

In this case, the number of MessageQueue received by each Consumer is constant. Let’s take the case of three consumers as an example.

Consumer and MessageQueue are also sorted. Firstly, the number of MessageQueue that each Consumer can get is determined. C1 can be allocated 2 MessageQueue, while C2 and C3 can only be allocated 1 MessageQueue

C1 takes one, C2 takes one, C3 takes one,C1 takes one. That’s what the three consumers on the graph are doing.

In addition, if the number of consumers is greater than the number of message queues, the processing is the same as for average allocation.

// The current consumer sorted index
int index = cidAll.indexOf(currentCID);  
// Index will be the first message queue index that the consumer gets
for (int i = index; i < mqAll.size(); i++) {
  // We use the modulo method here
  if(i % cidAll.size() == index) { result.add(mqAll.get(i)); }}Copy the code

AllocateMessageQueueByConfig

User-defined configuration, the user to create a Consumer, can be set to use the load on the strategy, if we set to AllocateMessageQueueByConfig way, we can specify the need to monitor our MessageQueues, It maintains a List of messageQueueList. We can insert a List of MessageQueues into this List.

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
/ / subscribe to the topic
consumer.subscribe("testMsg"."*");
// Register a message listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
  @Override
  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    // do job
    returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// User-defined queue policy
AllocateMessageQueueByConfig allocateMessageQueueByConfig = new AllocateMessageQueueByConfig();
/ / specify the MessageQueue
allocateMessageQueueByConfig.setMessageQueueList(Arrays.asList(new MessageQueue("testMsg"."broker-a".0)));
// Set the load policy for consumer
consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByConfig);
/ / start the consumer
consumer.start();
Copy the code

AllocateMessageQueueByMachineRoom

Room load policy, which is the current Consumer load only MessageQueue in the specified room, and the name of brokerName must be set in the format required: room name @brokerName

Let’s take a look at the actual use

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
/ / subscribe to the topic
consumer.subscribe("testMsg"."*");
// Register a message listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
  @Override
  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    // do job
    returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); AllocateMessageQueueByMachineRoom allocateMachineRoom =new AllocateMessageQueueByMachineRoom();
// Specify machine room names machine_room1 and machine_room2
allocateMachineRoom.setConsumeridcs(new HashSet<>(Arrays.asList("machine_room1"."machine_room2")));
// Set the load policy for consumer
consumer.setAllocateMessageQueueStrategy(allocateMachineRoom);
/ / start the consumer
consumer.start();
Copy the code

Let’s take a look at the source code

 // The index of the current consumer
int currentIndex = cidAll.indexOf(currentCID);
if (currentIndex < 0) {
  return result;
}
// Queue matching machine room conditions
List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
  //brokerName naming rule machine_room1@broker-a
  String[] temp = mq.getBrokerName().split("@");
  // Check whether the specified equipment room conditions are met
  if (temp.length == 2 && consumeridcs.contains(temp[0])) { premqAll.add(mq); }}// The number of queues allocated
int mod = premqAll.size() / cidAll.size();
/ / modulus
int rem = premqAll.size() % cidAll.size();
// The first queue index currently allocated
int startIndex = mod * currentIndex;
// The last index assigned to the queue
int endIndex = startIndex + mod;
// Fetch startIndex to endIndex queue
for (int i = startIndex; i < endIndex; i++) {
  result.add(mqAll.get(i));
}
// The number of MessageQueue and the number of consumers are not integer multiples
if (rem > currentIndex) {
  result.add(premqAll.get(currentIndex + mod * cidAll.size()));
}
Copy the code

To summarize the meaning of the source code:

1. Firstly, select the queue that the current Topic is in the specified machine room

2. After the queue is selected, specific allocation is made according to the load averaging strategy (the algorithm is very similar)

Ha ha, found the summary is really concise ah, don’t panic, to a picture to moisten your throat

In fact, this policy is to filter MessageQueue. After filtering, subsequent operations are carried out according to the load average policy. The result of this algorithm is the same as that of the load averaging algorithm. I feel that the two strategies should have been written by two people, otherwise two different algorithms would not be written to achieve the same function. I don’t know if I’m too bad to understand the big guy’s thinking.

AllocateMachineRoomNearby

My personal feeling is this strategy AllocateMessageQueueByMachineRoom improved versions, because the strategy of treatment than AllocateMessageQueueByMachineRoom more flexible, We also consider those cases where there is only MessageQueue but no Consumer in the same room. Let’s look at this strategy in detail. To use this strategy, you need to define a class to distinguish which house each broker is in. RocketMQ has a test unit for this strategy, which I modified slightly to include this class.

public class MyMachineResolver implements AllocateMachineRoomNearby.MachineRoomResolver {

    /** * Determine which house the broker is in *@param messageQueue
     * @return* /
    @Override
    public String brokerDeployIn(MessageQueue messageQueue) {
        return messageQueue.getBrokerName().split("-") [0];
    }

    /** * Determine which computer room the consumer is in@param clientID
     * @return* /
    @Override
    public String consumerDeployIn(String clientID) {
        return clientID.split("-") [0]; }}Copy the code

BrokerName and Consumer Id set eg: hz_ALIyun_room1-broker-a,

Hz_aliyun_root1 – Client1. Let’s take a look at how do we use this same room allocation strategy in our code

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
/ / subscribe to the topic
consumer.subscribe("testMsg"."*");
// Register a message listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
  @Override
  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    // do job
    returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// The user is assigned to the same machine room
consumer.setAllocateMessageQueueStrategy(new AllocateMachineRoomNearby(new AllocateMessageQueueAveragely()
                ,new MyMachineResolver()));
/ / start the consumer
consumer.start();
Copy the code

We can see that when we create the same machine room allocation policy, we also add an equal allocation policy, which is itself a policy, why we need to pass another policy. (This policy will only tell MessageQueue and consumers are grouped by machine room. After grouping, the specific load is allocated by another load strategy we pass.) Let’s go to the source code to see, which will be explained later

// Message queues are grouped by machine room
Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
for (MessageQueue mq : mqAll) {
  // Here we call our own class method to get the name of the broker's room
  String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
  // The machine room is not empty, put brokers into groups
  if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
    if (mr2Mq.get(brokerMachineRoom) == null) {
      mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
    }
    mr2Mq.get(brokerMachineRoom).add(mq);
  } else {
    throw new IllegalArgumentException("Machine room is null for mq "+ mq); }}// Consumers are grouped by machine room
Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
for (String cid : cidAll) {
  // Here we call our own class method to get the name of the broker's room
  String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
  if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
    if (mr2c.get(consumerMachineRoom) == null) {
      mr2c.put(consumerMachineRoom, new ArrayList<String>());
    }
    mr2c.get(consumerMachineRoom).add(cid);
  } else {
    throw new IllegalArgumentException("Machine room is null for consumer id "+ cid); }}// All MessageQueue assigned to the current consumer
List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();

//1. Assign MessageQeueue to the current consumer
String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
// Get the MessageQueue of the current machine room
List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
// Get the Consumer of the current machine room
List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
if(mqInThisMachineRoom ! =null && !mqInThisMachineRoom.isEmpty()) {
  // Obtain all MessageQueue and Consumers in the equipment room and load them according to the specified policy
  allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
}

//2. If there are no consumers in the machine room of the MessageQueue, allocate these MessageQueue to all consumers according to the configured backup policy
for (String machineRoom : mr2Mq.keySet()) {
  if(! mr2c.containsKey(machineRoom)) {// Add the assigned free MessageQueueallocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll)); }}Copy the code

Summarize the meaning of the source code

1. Group MessageQueue and consumers by machine room respectively

2. Obtain all the consumers and MessageQueue in the machine room where the current Consumer resides

3. Through the set load policy, specific load is carried out to obtain the MessageQueue allocated to the current Consumer

** If there are no consumers in the same machine room as MessageQueue, these MessageQueue will be allocated to all consumers in the cluster according to the configured load policy

5. Finally, the MessageQueue allocated by the Consumer will include those allocated by the same machine room and some of those allocated by the free state

Let me draw a picture here to illustrate,

First, load with the Consumer and MessageQueue in the machine room. Here, load is divided according to the average load (we create the nearest policy of the machine room and use the average load), and then divide the free state through the set load policy.

AllocateMessageQueueConsistentHash

So consistent hashing, so I’m going to talk a little bit about consistent hashing, but it’s not a good idea and I’m going to give you a picture and we’ll explain it

Consistency, mothballed a hash ring, the concept of hash ring is composed of the number 0 to 2 ^ 32-1, no matter how long the content characters, after the hash computation can be a number, such as long will end on a point on the hash ring, the points on the hash ring are virtual, such as we use Consumer Id here to hash calculation, The obtained points are physical points, and then the obtained points are stored in TreeMap. Then all MessageQueue are hashed in the same order to obtain the Consumer point nearest to MessageQueue clockwise. This is the Consumer that MessageQeueu ultimately belongs to.

Specific hash algorithm to reference this article consistency: segmentfault.com/a/119000002…

Let’s look at the source code:

// Turn all consumers into nodes and hash them over the hash ring
Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
for (String cid : cidAll) {
  cidNodes.add(new ClientNode(cid));
}

final ConsistentHashRouter<ClientNode> router; 
// Build the hash ring
if(customHashFunction ! =null) {
  router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
} else {
  // MD5 is used for Hash calculation by default
  router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
}

List<MessageQueue> results = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
  // Hash messageQueue to find the nearest consumer node clockwise
  ClientNode clientNode = router.routeNode(mq.toString());
  // Check if it is the current consumer
  if(clientNode ! =null&& currentCID.equals(clientNode.getKey())) { results.add(mq); }}Copy the code