preface

Namesrv is the simplest part of RocketMQ, bones we pick the softest one to bite, the source code quoted in version 4.8.0, due to the author’s limited level welcome criticism.

The noun is introduced

NameSrv (Name Server)

NameSrv acts as a message routing provider. A producer or consumer can use NameSrv to find a list of BrokerIP for individual topic responses. Multiple Namesrv instances form a cluster, but are independent of each other and do not exchange information.

From RocketMQ

Questions lead

From the introduction of Namesrv on the website above, we can draw the following three questions.

  • How Namesrv knows producer and consumer state information.
  • How producers and consumers use Namesrv to obtain routing information.
  • How can multiple Namesrv instances cluster independently of each other without information exchange to ensure high availability?

Look for answers in source code analysis

Let’s take a look at how RocketMQ is implemented from a source point of view. When we open the Namesrv source package, we find that Namesrv doesn’t have a lot of code.

(1)Let’s start with an overview of the next class as shown.

  • org.apache.rocketmq.namesrv.kvconfig.KVConfigManager: provides methods for loading Namesrv configurations into memory and modifying configurations.

  • org.apache.rocketmq.namesrv.kvconfig.KVConfigSerializeWrapper: to provide theKVConfigManagerSerialization methods, serialization and deserialization methods inherit fromorg.apache.rocketmq.remoting.protocol.RemotingSerializable

  • org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor: Used to handle requests from consumers or producers (clients) and brokers.

  • org.apache.rocketmq.namesrv.routeinfo.BrokerHousekeepingService: Listens for Broker status.
  • org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager: Saves routing information for messages in the RocektMQ cluster.

  • org.apache.rocketmq.namesrv.NamesrvController: Indicates the total controller of Namesrv.

  • org.apache.rocketmq.namesrv.NamesrvStartup: Namesrv startup entry.

(2) The following is the process we start from Namesrv for source analysis (omit some non-critical code such as log printing and exception catching, etc.).

// org.apache.rocketmq.namesrv.NamesrvStartup#main0

public static NamesrvController main0(String[] args) {
    try {
        / / # 1)
        NamesrvController controller = createNamesrvController(args);
        / / # 2)
        start(controller);
        / / # 3)
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }
    return null;
}

Copy the code

① : Create a Namesrv master controller.

② : Start the Namesrv controller.

③ : This log is printed when Namesrv is successfully started.

// org.apache.rocketmq.namesrv.NamesrvStartup#start

      public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }
        / / # 1)
        boolean initResult = controller.initialize();
        if(! initResult) {/ / # 2)
            controller.shutdown();
            System.exit(-3);
        }
        / / # 3)
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call(a) throws Exception {
                controller.shutdown();
                return null; }}));/ / # 4
        controller.start();
        return controller;
    }
Copy the code

① : Initializes NamesrvController.

② : NamesrvController needs to be disabled when initialization fails.

Register the JVM hook function and close the NamesrvController just before normal JVM downtime.

④ Start NamesrvController after normal initialization.

// org.apache.rocketmq.namesrv.NamesrvStartup#initialize
  public boolean initialize(a) {

        / / # 1)
        this.kvConfigManager.load();

        // Todo configuration related to remote communication, described later.
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        / / # 2)
        this.remotingExecutor =
      Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
        / / # 3)
        this.registerProcessor();
        / / # 4
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run(a) {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }},5.10, TimeUnit.SECONDS);
        / / # 5)
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run(a) {
                NamesrvController.this.kvConfigManager.printAllPeriodically(); }},1.10, TimeUnit.MINUTES);
        return true;
    }
Copy the code

① : Load the Namesrv configuration into memory.

RemotingExecutor is a thread pool that handles requests from client (producer and consumer) brokers to DefaultRequestProcessor.

③ bind DefaultRequestProcessor to remotingExecutor.

⑤ : scheduledExecutorService The thread pool of scheduled tasks. It scans every 10 seconds to check whether existing brokers are still alive.

⑥ : The thread pool of scheduled task execution prints the Namesrv configuration every 10 seconds.

So far, we have analyzed the startup process of Namesrv.

Core code analysis

// org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest
// Namesrv exposed interface (handles requests sent by clients (consumers and producers) and brokers)
// The code for this method is relatively simple and will not be analyzed in detail here
 public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException
       
Copy the code
// org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager

    // the task Broker is down if it has not received a message from the Broker within two minutes
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    // # read/write lock
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    // #③ Topic and queue mapping
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    // the name of a primary Broker is the same as that of a secondary Broker
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    // #⑤ Mapping between RocketMQ cluster name and Broker name. Each primary/secondary Broker has a Broker name.
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    // #⑥ Existing Broker information,
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    // #⑦ Message filtering
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
Copy the code

6: I want to elaborate here, Namesrv scans all brokers every 10s (through the scheduledExecutorService scheduled task thread pool described above) based on whether the current system time difference is greater than BROKER_CHANN for the heartbeat packet lastUpdateTimestamp EL_EXPIRED_TIME is used to determine whether the Broker is alive or not. This data is not real-time, so the client is not aware of the state of the Broker in real time.

// org.apache.rocketmq.namesrv.routeinfo.BrokerLiveInfo
class BrokerLiveInfo {
    private long lastUpdateTimestamp;
    // # use the version number to solve the ABC problem, if you do not understand the ABA problem can be baidu.
    private DataVersion dataVersion;
    private Channel channel;
    private String haServerAddr;
    // ...
    }
Copy the code
  public RegisterBrokerResult registerBroker(
        final String clusterName,final String brokerAddr,
        final String brokerName,final long brokerId,
        final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            try {
              Write locks are required to avoid thread-safe problems since the containers for the following operations are non-thread-safe
                this.lock.writeLock().lockInterruptibly();
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                brokerNames.add(brokerName);
                boolean registerFirst = false;
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null == brokerData) {
                    registerFirst = true;
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
                //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
                //The same IP:PORT must only have one record in brokerAddrTable
                Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<Long, String> item = it.next();
                    if (null! = brokerAddr && brokerAddr.equals(item.getValue()) && brokerId ! = item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr);

                if (null! = topicConfigWrapper && MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                        || registerFirst) {
                        ConcurrentMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
                        if(tcTable ! =null) {
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
                            }
                        }
                    }
                }
                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                    new BrokerLiveInfo(
                        System.currentTimeMillis(),
                        topicConfigWrapper.getDataVersion(),
                        channel,
                        haServerAddr));
                if (null == prevBrokerLiveInfo) {
                    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                }

                if(filterServerList ! =null) {
                    if (filterServerList.isEmpty()) {
                        this.filterServerTable.remove(brokerAddr);
                    } else {
                        this.filterServerTable.put(brokerAddr, filterServerList); }}if(MixAll.MASTER_ID ! = brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if(masterAddr ! =null) {
                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                        if(brokerLiveInfo ! =null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); }}}}finally {
                this.lock.writeLock().unlock(); }}catch (Exception e) {
            log.error("registerBroker Exception", e);
        }
        return result;
    }
Copy the code

The code above is easier to understand is to update the above several containers, the first update on the new and not the first update directly added.

conclusion

From the source code above we can find the answers to the three questions we posed at the beginning.

① : Name uses a combination of timed scanning and the Broker’s active reporting of heartbeat information to determine whether the Broker is alive or not.

(2) : By org. Apache. Rocketmq. Namesrv. Processor. DefaultRequestProcessor# the processRequest method for org. Apache. Rocketmq. Namesrv. Routeinfo. RouteInfoManager Specifies the routing information maintained by the RouteInfoManager class.

The Namesrv module does not provide any interface for communication between Namesrv, and each Namesrv maintains a complete set of Broker cluster information. The RocketMQ cluster charge can continue to work as long as there is a Namesrv available.