sequence

In this paper, we study the rocketmq AllocateMessageQueueAveragely

AllocateMessageQueueStrategy

Rocketmq – the client – 4.5.2 – sources jar! /org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java

public interface AllocateMessageQueueStrategy {

    /**
     * Allocating by consumer id
     *
     * @param consumerGroup current consumer group
     * @param currentCID current consumer id
     * @param mqAll message queue set in current topic
     * @param cidAll consumer set in current consumer group
     * @return The allocate result of given strategy
     */
    List<MessageQueue> allocate(
        final String consumerGroup,
        final String currentCID,
        final List<MessageQueue> mqAll,
        final List<String> cidAll
    );

    /**
     * Algorithm name
     *
     * @return The strategy name
     */
    String getName();
}
Copy the code
  • Defines the allocate AllocateMessageQueueStrategy, getName method

AllocateMessageQueueAveragely

Rocketmq – the client – 4.5.2 – sources jar! /org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        if (currentCID == null || currentCID.length() < 1) {
            throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {
            throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {
            throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if(! cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                consumerGroup,
                currentCID,
                cidAll);
            return result;
        }

        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }

    @Override
    public String getName() {
        return "AVG"; }}Copy the code
  • AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy interface, its return getName AVG, its the allocate method first calculates the index (cidAll.indexOf(currentCID)), and then compute mod(mqAll.size() % cidAll.size())
  • Then calculate averageSize, if mqall.size () is less than or equal to cidall.size () then set to 1, otherwise set to cidall.size ()mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()
  • Finally calculate startIndex((mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod) and range (Math.min(averageSize, mqAll.size() - startIndex)); Then go through the loop by range, subscript(startIndex + i) % mqAll.size()Add the message to result

summary

AllocateMessageQueueAveragely AllocateMessageQueueStrategy interface is achieved, the getName return the AVG, the method of the allocate cycle according to the range, Add the message subscript (startIndex + I) % mqall.size () to result

doc

  • AllocateMessageQueueAveragely