Previously: This article is not intended to take you into the RocketMQ source code to analyze how it implements load balancing. It focuses on how to trigger load balancing.

As you already know, The design philosophy of RocketMQ is that the same Queue can only be held by one Consumer at a time, but the same Consumer can consume multiple queues at the same time. In order for the subscription and consumption model to be efficient, Rocket always wants to distribute queues evenly enough for daily use, The up-and-down of a Consumer, or the dynamic scaling of a Queue, can break allocation balance, so Rocket provides a complete Rebalance.

The trigger condition

Rebalance. There are three triggers -- two active and one passive. 1. When the Consumer is started, the start method is used to actively execute the load balancing logic. 2. The scheduled task is triggered. 3.Broker sends a notification to inform clients that load balancing is required. Today to read the code found very coincidence with DefaultMQPushConsumerImpl three trigger conditions more or less. The start () has a relationship;Copy the code

DefaultMQPushConsumerImpl.start()

DefaultMQPushConsumerImpl instance is created, Private final rebalanceImpl rebalanceImpl = new RebalancePushImpl(this); At this point, the rebalanceImpl object is useless because the ta key member property is null, and start below takes over the assignment. Here is an excerpt of the main start() code:

public synchronized void start(a) throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST: 
            /* Check configuration */
            this.checkConfig();
            /* Build Topic subscription information -- SubscriptionData and add it to RebalanceImpl subscription information */
            this.copySubscription();
            /* Initialize MQClientInstance */
            this.mQClientFactory = MQClientManager.getInstance()
                .getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
           /** * Rich rebalanceImpl object properties, notice that the previously initialized object is bloodshot * the previously generated rebalanceImpl object is not fully initialized until now * the rebalanceImpl is a related implementation of load balancing */ 
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

            /** * Register consumers with MQClientInstance and start MQClientInstance * all consumers and producers in a JVM hold the same MQClientInstance, MQClientInstance is started only once */
            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
            break;  
        case. ;default:
            break;  
    }

    /* The Consumer started successfully and immediately sends heartbeat to all brokers */
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    
    /* * Notice that the Consumer will trigger a load balancing immediately * * but this is not a simple call to the implementation of the load balancing, this is a call to the relevant service thread */ I will focus on this later */
    this.mQClientFactory.rebalanceImmediately();
}
Copy the code

RebalanceImpl

By definition, all operations related to Consumer load balancing are delegated to the RebalanceImpl object. Each Consumer object holds one RebalanceImpl instance, and each RebalanceImpl instance serves only one Consumer. The two are a mutual hold, circular reference relationship. Let’s look at the key member properties of this object:

RebalancePushImpl extends RebalanceImpl { 
    protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<>(64);

    /* ConcurrentMap
      
       >, topic and its MessageQueue information */
      ,>
    protected final ConcurrentMap<String, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<>();

    /* ConcurrentMap
      
       , topic and SubscriptionData */
      ,>
    protected final ConcurrentMap<String, SubscriptionData> subscriptionInner = new ConcurrentHashMap<>();

    /* How to allocate the load algorithm is determined by the commander-in-chief
    protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;

    /* Consumer instance */
    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
}
Copy the code

One ProcessQueue object stands out because I didn’t comment on it, and if you’ve read the Rocket source code, it’s an extremely important part of the Consumer’s Message consumption process. It’s very important, and you can think of the TA as the Message bearer on the Client side. I won’t repeat it here because it doesn’t have much to do with load timing. (I emphasize that it is not related to load timing, but directly related to load balancing)

It’s doRebalance(), but rebalanceByTopic() that does the load logic.

RebalanceByTopic()

RebalanceByTopic () is the ultimate destination for load balancing, where all calls in the system that need to be loaded end up. Here the author only analyzes the implementation of the cluster consumption mode extract key code:

private void rebalanceByTopic(String topic, boolean isOrder) {
    /* Get all queues under this Topic */
    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
    /* get all the consumer ids */ of this ConsumerGroup under this Topic
    List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
    if(mqSet ! =null&& cidAll ! =null) {
        List<MessageQueue> mqAll = new ArrayList<>(mqSet);
        
        /* These two sorts are critical */
        Collections.sort(mqAll);
        Collections.sort(cidAll);

        /* Load balancing algorithm */
        AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
        List<MessageQueue> allocateResult;
        /* Call the specific algorithm implementation for queue allocation */
        allocateResult = strategy.allocate(
            this.consumerGroup,
            this.mQClientFactory.getClientId(), mqAll, cidAll ); }}Copy the code

Two lines of code that seem understatement but matter: collections.sort (mqAll); Collections.sort(cidAll); All consumers under the ConsumeGroup get the same queue order and the same Consumer Id order. On the premise that the consistency of allocation view is guaranteed, the allocation algorithm is the same. In this way, although consumers do not exchange any information during load balancing, they can complete the allocation of queue balancing without interfering with each other. Details of how to distribute in allocateMessageQueueStrategy RocketMQ also default support for multiple allocation algorithm, is simple, I don’t want to go. After the implementation of load balancing, the question of who calls TA, how to call TA, and when to call TA is always on our mind. We can’t get around the RebalanceService object until we analyze the above questions.

RebalanceService

There is a class of objects in RocketMQ that stands aloof, one object dominating a domain, and they tend to be constrained only by the operating system. The operating system seems to favor them, too, by assigning them time slices and scheduling them directly (for a simple reason, to be answered later). Ta is the legendary ServiceThread, which is often referred to as the ServiceThread.

public abstract class ServiceThread implements Runnable {
    protected boolean isDaemon = false;

    /* This can not be loved, directly hold a separate thread */
    private Thread thread;

    /* Request a thread when executing start */
    public void start(a) {
        /* Only one application is allowed */
        if(! started.compareAndSet(false.true)) {
            return;
        }
        stopped = false;
        this.thread = new Thread(this, getServiceName());
        /* Set to non-daemon thread */
        this.thread.setDaemon(isDaemon);
        this.thread.start(); }}Copy the code

The ServiceThread family thrives. In addition to RebalanceService, which specializes in load balancing, there are a number of siblings: FlushRealTimeService: asynchronous flush service thread CommitRealTimeService: asynchronous flush service thread groupService: synchronous flush service thread…… Each of these is behind Rocket’s steady performance, and the three above are the service threads that spawn. (It’s so broad, if anyone wants to see it, I’ll try to analyze it.)

The RebalanceService holds a single thread for load balancing, but it does not work indefinitely.

public class RebalanceService extends ServiceThread { 
    /* Load balancing interval. The default value is 20s. */ is supported
    private final static long waitInterval =Long.parseLong(
        System.getProperty("rocketmq.client.rebalance.waitInterval"."20000"));private final MQClientInstance mqClientFactory;

    public void run(a) {
        */ is executed as long as the thread is not terminated
        while (!this.isStopped()) {
            /* Love will be presumptuous, but love is restraint, rest 20s */
            this.waitForRunning(waitInterval);
            /* Perform load balancing */
            this.mqClientFactory.doRebalance(); }}}Copy the code

Timing task

The RebalanceService instance can only be started at a single point per Java service. The RebalanceService instance can only be started at a single point. Only one instance of mqClientFactory is started and this instance object is shared by all Consumer instances within the single point. Each time trigger mqClientFactory. DoRebalance () all the Consumer will be held under the JVM to load balanceCopy the code
/** * The RebalanceService thread calls this method every 20 seconds by default * ⚠️: only one instance of MQClientInstance is started per Java service single point, and all Consumer instances within the single point hold this instance object *@seeThe Consumer object will register itself with MQClientInstance *@see#consumerTable Consumer Object registry * * ⚠️: a Java service single point has only one RebalanceService thread * ⚠️: But each Consumer instance holds a RebalanceImpl object */
public void doRebalance(a) {
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if(impl ! =null) {
            try {
                impl.doRebalance();
            } catch (Throwable e) {
                log.error("doRebalance exception", e); }}}}Copy the code

Take the initiative to trigger

As I write this, I'm wondering why the scheduled task I just did wasn't assigned to ScheduledExecutorService in the JDK. In fact, many of the scheduled tasks in RocketMQ do the same. But I didn't realize that until just now, because RebalanceService supports active wake-up and early execution of tasks. When Consumer goes live, it triggers active load balancing because it wakes up the RebalanceService thread, Start () will call rebalanceImmediately () public void rebalanceImmediately () {this. RebalanceService. Wakeup (); }Copy the code

The Broker informed

It’s much more complicated for brokers to send notifications to consumers that require load balancing, but all it takes is a few more Rpc calls and a network transfer.

Whenever DefaultMQPushConsumerImpl instance, calling start later, always send a heartbeat to the Broker The call stack is as follows: DefaultMQPushConsumerImpl.start() -> MQClientInstance.sendHeartbeatToAllBrokerWithLock() -> MQClientInstance.sendHeartbeatToAllBroker() -> MQClientAPIImpl.sendHearbeat()Copy the code

Immediately after the Consumer starts, it sends a heartbeat packet to inform the Broker.

public int sendHearbeat(
        String addr, HeartbeatData heartbeatData, long timeoutMillis
    ) throws RemotingException, MQBrokerException, InterruptedException {
        /* Another Rpc remote call */
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
        request.setLanguage(clientConfig.getLanguage());
        request.setBody(heartbeatData.encode());
        RemotingCommand response = this.remotingClient.invokeSync(
            addr, request, timeoutMillis
        );
    }
Copy the code

According to the RequestCode. HEART_BEAT learned that the Rpc processor as ClientManageProcessor ClientManageProcessor. HeartBeat () – > ConsumerManager.registerConsumer()

Just extract the key code:

public boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo,
    ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
    Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable
) {
   /* New socketchannels are created when new consumers are created */
   boolean r1 = consumerGroupInfo.updateChannel(
       clientChannelInfo, consumeType, messageModel, consumeFromWhere
   );

   /* Determine whether the subscription information has changed */
   boolean r2 = consumerGroupInfo.updateSubscription(subList);

   if (r1 || r2) {
       if (isNotifyConsumerIdsChangedEnable) {
           * / / * trigger ConsumerGroupEvent. CHANGE events
           this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); }}}Copy the code

Calls to DefaultConsumerIdsChangeListener. The handle () you can see if it is CHANGE event is called Broker2Client. NotifyConsumerIdsChanged ()

public void notifyConsumerIdsChanged(Channel channel, String consumerGroup) {
    /* Construct the Rpc request header */
    NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
    requestHeader.setConsumerGroup(consumerGroup);
    /* Construct the Rpc request object */
    RemotingCommand request = RemotingCommand.createRequestCommand(
        RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader
    );

    /* RPC */
    this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
}
Copy the code

The Rpc request will eventually to ClientRemotingProcessor. NotifyConsumerIdsChanged () processing

public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx, RemotingCommand request) {
    NotifyConsumerIdsChangedRequestHeader requestHeader =(NotifyConsumerIdsChangedRequestHeader) request
        .decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);
    /* Call the load balancing logic */
    this.mqClientFactory.rebalanceImmediately();
}
Copy the code