The fourth article in 2021

Description of content

This article focuses on the Namespace of the routing center in RocketMQ, and then tries to understand the design principle from a source code perspective.

Namespace Routing Center

Namespace is the routing center for RocketMQ. Namespace exists to store all Broker and Topic information so that producers and consumers can register and update routing information via heartbeat.

In fact, Namespace functions much like a registry in a microservices architecture, distributed services SOA architecture. For example, older versions of Kafka register cluster information with Zookeeper. But instead of using third-party tools, RocketMQ developed Namespace as a routing service. Namespaces can also be clustered, but they are stateless nodes, and there is no Master/Slave concept.

When talking about routing centers, we need to understand several roles:

  1. Namespace Routing Center
  2. Broker message server
  3. Producer producers
  4. Consumer consumers

The following picture shows the relationship between them

Arrow directions represent the direction in which each character depends on the action.

  1. First, the Broker registers with a Namespace
  2. Producer gets the address of the Broker from Namespace and accesses the Broker according to the rules
  3. The Consumer retrieves the Broker’s address from the Namespace and accesses the Broker according to the rules

As the manager of the Broker,Namespace checks for availability at regular intervals. Moreover, message producers and consumers are not directly notified when routing changes. This is to reduce the complexity of the overall architecture

As a manager, the Namespace can also be deployed in multiple ways to maintain high availability. In addition,Namespace does not communicate with Namespace, that is, the information between the two is different at a certain time, but it does not affect the message sending. Architecture brief and available. This is the highlight of RocketMQ’s design, and one we can learn from!

Below, I will use the source code, a preliminary explanation of the following content:

  1. The design of the Namespace
  2. Broker management, including join and drop, heartbeat
  3. How is a Namespace guaranteed to be highly available

The Namespace to start

In the RocketMQ source directory there is a namesrv, which is the Namespace source package. And the corresponding code entry, is NamesrvStartup NamesrvStartup is to start the class. The main function is to parse the parameters from the command line, configure namesrv and Netty configuration file parsing, configure the log context, configure NamesrvController and finally start.

NamesrvStartup the first focused method createNamesrvController. This method is used for parameter configuration, such as obtaining the parameters of the startup command line, reading the default configuration from the configuration file, etc.

The first step is to create a CommandLine and convert it into a CommandLine object in RocketMQ

    Options options = ServerUtil.buildCommandlineOptions(new Options());
     commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
Copy the code

Then fill in the corresponding NamesrvConfig object and NettyServerConfig object based on the parameters.

    // get the parameter c
    if (commandLine.hasOption('c')) {
        String file = commandLine.getOptionValue('c');
        if(file ! =null) {
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);

            namesrvConfig.setConfigStorePath(file);

            System.out.printf("load config properties file OK, %s%n", file); in.close(); }}// get the p argument
    if (commandLine.hasOption('p')) {
        InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
        MixAll.printObjectProperties(console, namesrvConfig);
        MixAll.printObjectProperties(console, nettyServerConfig);
        System.exit(0);
    }
    // Turn properties into objects
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
Copy the code

The RocketMq directory is then retrieved to prepare the location where the log is generated

    // instantiate the log context
    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
    JoranConfigurator configurator = new JoranConfigurator();
    configurator.setContext(lc);
    lc.reset();
    configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
    // Internal loggerFactory
    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    // Prints parameters
    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);
    // Instantiate and configure NamesrvController
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
    // Remember that all configurations should not be ignored
    controller.getConfiguration().registerConfig(properties);
Copy the code

After configuration, NamesrvController is started. First, initialization. The action of initialize is a bit more complicated. Mainly for the

  1. KvConfigManager load
  2. RemotingServer loads and starts
  3. Register the handler (handler refers to the logic that processes the request to access the current computer).
  4. Start two timers: scan inactive brokers and print kvConfigManager parameters
  5. Start FileWatchService for SSL/TLS

The KvConfigManager#load method starts loading the corresponding parameters

    / / 1.
    content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
    if(content ! =null) {
        KVConfigSerializeWrapper kvConfigSerializeWrapper =
            KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
        / / 2.
        if (null! = kvConfigSerializeWrapper) {this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
            log.info("load KV config table OK"); }}Copy the code

①, get kv path from namesrvconfig; Code ②, read path to obtain parameters and add them to kv storage;

Next, RemotingServer loads and starts. RemotingServer is responsible for receiving registered network traffic from the Broker. It is in Remoting, RocketMQ’s generic abstract component package.

    // Instantiate the remote server
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
    // instantiate the thread pool
    this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
Copy the code

Then start both timers

    // Timed sweep is not active Broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run(a) {
            // Scan all active brokers
            NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }},5.10, TimeUnit.SECONDS);
     // Print KV
     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run(a) {
            NamesrvController.this.kvConfigManager.printAllPeriodically(); }},1.10, TimeUnit.MINUTES);
Copy the code

The principle of scanning inactive/disconnected brokers is simple. The current time is subtracted from the last time checked. If the time exceeds the threshold, the corresponding Channel of the Broker is retrieved.

The last thing to do is deal with SSL/TLS. To clarify,RocketMQ needs to define paths :CertPath, KeyPath, and TrustCertPath addresses for writing files. An error will be reported if there is no permission.

    fileWatchService = new FileWatchService(
        new String[] {
            TlsSystemConfig.tlsServerCertPath,
            TlsSystemConfig.tlsServerKeyPath,
            TlsSystemConfig.tlsServerTrustCertPath
        },
        new FileWatchService.Listener() {
            // Certificate change, key change
            boolean certChanged, keyChanged = false;
            @Override
            public void onChanged(String path) {
                // If path is the same as ttlServer path
                if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                    log.info("The trust certificate changed, reload the ssl context");
                    reloadServerSslContext();
                }
                if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                    certChanged = true;
                }
                if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                    keyChanged = true;
                }
                // Start if all changes are made. Reload context
                if (certChanged && keyChanged) {
                    log.info("The certificate and private key changed, reload the ssl context");
                    certChanged = keyChanged = false; reloadServerSslContext(); }}// Load the loadSslContext
            private void reloadServerSslContext(a) { ((NettyRemotingServer) remotingServer).loadSslContext(); }});Copy the code

The action for NamesrvController#initialize is now complete and NamesrvController#start is started.

    // Get netty's remoteServer
    this.remotingServer.start();
    // The file monitoring service is enabled
    if (this.fileWatchService ! =null) {
        this.fileWatchService.start();
    }
Copy the code

Since RemoteServer is already configured, you can start it directly. The bootstrap setup is similar to the Netty setup, with a focus on ServerBootstrap configuration

    ServerBootstrap childHandler =
    this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
        .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
         / / 1.
        .option(ChannelOption.SO_BACKLOG, 1024)
        .option(ChannelOption.SO_REUSEADDR, true)
        .option(ChannelOption.SO_KEEPALIVE, false)
        / / 2.
        .childOption(ChannelOption.TCP_NODELAY, true)
        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
        / / 3.
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
                    .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                    .addLast(defaultEventExecutorGroup,
                        encoder,
                        new NettyDecoder(),
                        new IdleStateHandler(0.0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), connectionManageHandler, serverHandler ); }});Copy the code

Code ①, configure BACKLOGg, configure REUSEADDR, configure KEEPALIVE. Code ②, configure NoDelay, configure SNDBUF, configure RCVBUF, these parameters are TCP to send and receive optimization. If NoDelay is enabled,TCP will use Nagle algorithm to send data, which can make full use of the buffer, improve the quality of the request, but will slightly slow down the timeliness of transmission; We can see that the handler that added the processing logic is handshakeHandler, NettyDecoder, connectionManageHandler, serverHandler Heart processing logic);

To improve performance, the system determines whether to enable the Netty memory pool feature.

     / / isServerPooledByteBufAllocatorEnable set parameters
    if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
        childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }
Copy the code

Finally, a timer timed scan timeout request is initiated. The principle of scanning is based on the difference between the request start time and the current time, if the maximum threshold is exceeded, the thread pool is asynchronously destroyed.

    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run(a) {
            try {
                NettyRemotingServer.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e); }}},1000 * 3.1000);
Copy the code

Once RemoteServer is done, go to fileWatchService.

    if (this.fileWatchService ! =null) {
        this.fileWatchService.start();
    }
Copy the code

FileWatchService listens for specified files

    this.watchFiles = new ArrayList<>();
    this.fileCurrentHash = new ArrayList<>();
    // Listen to the file
    for (int i = 0; i < watchFiles.length; i++) {
        if (StringUtils.isNotEmpty(watchFiles[i]) && new File(watchFiles[i]).exists()) {
            // Add file and file Hash result code
            this.watchFiles.add(watchFiles[i]);
            this.fileCurrentHash.add(hash(watchFiles[i])); }}Copy the code

The Hash code is used to analyze whether the file has changed. It is then processed in FileWatchService#run

    while (!this.isStopped()) {
        try {
            // Set the scheduled listening duration
            this.waitForRunning(WATCH_INTERVAL);
            // Loop over the list of files
            for (int i = 0; i < watchFiles.size(); i++) {
                String newHash;
                try {
                    newHash = hash(watchFiles.get(i));
                } catch (Exception ignored) {
                    log.warn(this.getServiceName() + " service has exception when calculate the file hash. ", ignored);
                    continue;
                }
                // If the Hash is changed, the file is changed
                if(! newHash.equals(fileCurrentHash.get(i))) { fileCurrentHash.set(i, newHash); listener.onChanged(watchFiles.get(i)); }}}}Copy the code

If the file changes, the onChanged function is called

    if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
        log.info("The trust certificate changed, reload the ssl context");
        reloadServerSslContext();
    }
    if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
        certChanged = true;
    }
    if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
        keyChanged = true;
    }
    // Start if all changes are made. Reload context
    if (certChanged && keyChanged) {
        log.info("The certificate and private key changed, reload the ssl context");
        certChanged = keyChanged = false;
        reloadServerSslContext();
    }
Copy the code

At this point,Namspace has been successfully started.

The management of the Broker

Broker management includes Broker information, registration, and culling.

The Broker information

RouteInfoManager is the core class that manages the Broker for Namespace. It is in RouteInfoManager under rounteinfo package of Namesrv. Here are the member attributes of the class.

    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
Copy the code

The following is an explanation of the member parameters:

  1. topicQueueTable, message queue routes messages, and message sending is balanced according to the routing table
  2. brokerAddrTableBroker base information. The cluster name, brokerName, and primary and secondary Broker addresses
  3. clusterAddrTableBroker cluster information, which stores the names of all brokers in each cluster
  4. brokerLiveTableBroker status information, heartbeat packets are updated
  5. filterServerTable, a list of FilterServers for class pattern message filtering

There are several data structures involved in the member property: QueueData, BrokerData, and BrokerLiveInfo. They have the following description:

  1. A Topic has multiple message queues. By default, a Broker creates four read queues and four write columns for each Topic.
  2. Multiple brokers in a cluster with the same BrokerName form a master-slave schema.
  3. LastUpdateTimestamp in BrokerLiveInfo holds the time of the last heartbeat;

Suppose we have a cluster with 2 master and 2 slave RocketMQ schemas. At run time the corresponding parameters are:

The first is the master-slave relationship

{
    cluster: c1,
    brokerName: broker-a,
    brokerId: 0
}
{
    cluster: c1,
    brokerName: broker-a,
    brokerId: 1
}
Copy the code

The second master-slave relationship

{
    cluster: c1,
    brokerName: broker-b,
    brokerId: 0
}
{
    cluster: c1,
    brokerName: broker-b,
    brokerId: 1
}
Copy the code

Then when run, the corresponding topicQueueTable is:

topicQueueTable: {
    topic1: [
        {
            brokerName: "broker-a",
            readQueueNums: 4,
            writeQueueNums: 4,
            perm: 6,
            topicSynFlag: false
        },
        {
            brokerName: "broker-b",
            readQueueNums: 4,
            writeQueueNums: 4,
            perm: 6,
            topicSynFlag: false
        }
    ],
    
    topic2: ... 
}
Copy the code

Then the corresponding brokerAddrTable is:

brokerAddrTable: {
    "broker-a": {
        cluster: "c1",
        brokerName: "broker-a",
        brokerAddrs: {
            0:"192.168.1.1",
            1:"192.168.1.2 instead"}},"broker-b": {
        cluster: "c1",
        brokerName: "broker-b",
        brokerAddrs: {
            0:"192.168.1.3",
            1:"192.168.1.4"}}}Copy the code

Then the corresponding brokerLiveTable is:

brokerLiveTable: {
   "192.168.1.1": {
       lastUpdateTimestamp: 1623587820994,
       dataVersion: 0xxie,
       channel: channelObj,
       haServerAddr: "192.168.1.2 instead"
   },
   "192.168.1.2 instead": {
       lastUpdateTimestamp: 1623587123994,
       dataVersion: 0xxi1,
       channel: channelObj,
       haServerAddr: "192.168.1.2 instead"
   },
   "192.168.1.3": {
       lastUpdateTimestamp: 1624558123994,
       dataVersion: 0xxes,
       channel: channelObj,
       haServerAddr: v
   },
   "192.168.1.4": {
       lastUpdateTimestamp: 1624558232343,
       dataVersion: 0xevs,
       channel: channelObj,
       haServerAddr: "192.168.1.2 instead"}}Copy the code

The Broker is registered

Broker registers time nodes with Nameserver periodically both at start and after start. Broker registration is a two-step process:

  1. The Broker sends a registration request
  2. Namesrv processes registration requests

The Broker startup code entry point is located at org. Apache. Rocketmq. Broker. BrokerStartup. Start. In order to avoid some of the tedious configuration code, I write a call chain here.

BrokerStartup#start
BrokerController#start
BrokerController#registerBrokerAll
BrokerController#doRegisterBrokerAll
Copy the code

The doRegisterBrokerAll method basically sends a request to register. First get the NamesRV address list and construct the request header

    final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if(nameServerAddressList ! =null && nameServerAddressList.size() > 0) {
        // Construct the request
        final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
        requestHeader.setBrokerAddr(brokerAddr);
        requestHeader.setBrokerId(brokerId);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setClusterName(clusterName);
        requestHeader.setHaServerAddr(haServerAddr);
        requestHeader.setCompressed(compressed);
        // Construct the Broker's request parameters
        RegisterBrokerBody requestBody = new RegisterBrokerBody();
        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
        requestBody.setFilterServerList(filterServerList);
        final byte[] body = requestBody.encode(compressed);
        final int bodyCrc32 = UtilAll.crc32(body);
        requestHeader.setBodyCrc32(bodyCrc32);
Copy the code

It then loops through the namesrv address list and sends a request for registration

    final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
    for (final String namesrvAddr : nameServerAddressList) {
        brokerOuterExecutor.execute(new Runnable() {
            @Override
            public void run(a) {
                try {
                    // Register each NameServer address circularly
                    RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                    if(result ! =null) {
                        registerBrokerResultList.add(result);
                    }

                    log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                } 
                / /...}}); }Copy the code

Above, the broker sends the registration request. Namesrv needs to handle broker registration requests.

DefaultRequestProcessor is a request processor that determines which events to process based on the code of the external request. When the requestCode = 103, will be called DefaultRequestProcessor# registerBrokerWithFilterServer or DefaultRequestProcessor# registerBroker method. The former excludes unregistered brokers based on a filter list, while the latter can be registered directly.

The registerBroker method verifies that the request is valid

    if(! checksum(ctx, request, requestHeader)) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("crc32 not match");
        return response;
    }
Copy the code

Then get the RouteInfoManager of Namesrv and update the broker information into it

    RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
        requestHeader.getClusterName(),
        requestHeader.getBrokerAddr(),
        requestHeader.getBrokerName(),
        requestHeader.getBrokerId(),
        requestHeader.getHaServerAddr(),
        topicConfigWrapper,
        null,
        ctx.channel()
    );
Copy the code

The last set HaServerAddress/MasterAddress, return the result of Reponse in construction and filling parameters

    responseHeader.setHaServerAddr(result.getHaServerAddr());
    responseHeader.setMasterAddr(result.getMasterAddr());
    / / a
    byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
    response.setBody(jsonValue);
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
Copy the code

Let’s focus on the RouteInfoManager#registerBroker method. Because information needs to be written, it is locked first

    this.lock.writeLock().lockInterruptibly();
Copy the code

Checks for the presence of broker information. If no, add one.

    boolean registerFirst = false;
    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
    if (null == brokerData) {
        registerFirst = true;
        brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
        this.brokerAddrTable.put(brokerName, brokerData);
    }
Copy the code

Check whether the broker is registered for the first time by checking whether broker information exists or whether the address of the broker previously exists in broker information

    String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
    registerFirst = registerFirst || (null == oldAddr);
Copy the code

If the broker is Master and the topic configuration of the broker changes or is registered for the first time, the topic routing metadata needs to be created or updated to populate the topicQueueTable, which automatically registers routing information for the default topic. This contains routing information for mixall.default_topic. When a message producer sends a topic, routing information for mixall.default_topic is returned if the topic is not created and BrokerConfig’s autoCreateTopicEnable is true.

    if (null! = topicConfigWrapper && MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
            || registerFirst) {
            ConcurrentMap<String, TopicConfig> tcTable =
                topicConfigWrapper.getTopicConfigTable();
            if(tcTable ! =null) {
                for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                    this.createAndUpdateQueueData(brokerName, entry.getValue()); }}}}Copy the code

Add broker survival information

    BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
        new BrokerLiveInfo(
            System.currentTimeMillis(),
            topicConfigWrapper.getDataVersion(),
            channel,
            haServerAddr));
    if (null == prevBrokerLiveInfo) {
        log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
    }
Copy the code

Register a list of FilterServer addresses for brokers that have multiple FilterServer message filtering servers associated with one broker

    if(filterServerList ! =null) {
        if (filterServerList.isEmpty()) {
            this.filterServerTable.remove(brokerAddr);
        } else {
            this.filterServerTable.put(brokerAddr, filterServerList); }}Copy the code

If the broker is a slave node, you need to look up the node information for the Master of the broker and update the corresponding masterAddr property.

    if(MixAll.MASTER_ID ! = brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if(masterAddr ! =null) {
            BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
            if(brokerLiveInfo ! =null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); }}}Copy the code

Routing is destroyed

Route destruction can be done in two ways:

  1. Periodic scans weed out brokers without heartbeats
  2. Destroy if the broker exits normally

The core idea of both methods is to remove routing information from Namesrv, including topicQueuetable, brokerAddrTable, brokerLiveTable, and filterServerTable. So both methods have common code.

The entry to the timer scan is in the NamesrvController#initialize method

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run(a) {
            // Scan for brokers that are no longer active
            NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }},5.10, TimeUnit.SECONDS);
Copy the code

The scanNotActiveBroker method closes the channel of the broker and calls routeInfoManager#onChannelDestroy to remove the channel from route management.

    try {
        / / 1.
        this.lock.writeLock().lockInterruptibly();
        this.brokerLiveTable.remove(brokerAddrFound);
        this.filterServerTable.remove(brokerAddrFound);
        String brokerNameFound = null;
        boolean removeBrokerName = false;
        / / 2.
        Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
            this.brokerAddrTable.entrySet().iterator();
        while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
            BrokerData brokerData = itBrokerAddrTable.next().getValue();

            Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
            while (it.hasNext()) {
                Entry<Long, String> entry = it.next();
                Long brokerId = entry.getKey();
                String brokerAddr = entry.getValue();
                if (brokerAddr.equals(brokerAddrFound)) {
                    brokerNameFound = brokerData.getBrokerName();
                    it.remove();
                    log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                        brokerId, brokerAddr);
                    break; }}if (brokerData.getBrokerAddrs().isEmpty()) {
                removeBrokerName = true;
                itBrokerAddrTable.remove();
                log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); }}/ / 3.
        if(brokerNameFound ! =null && removeBrokerName) {
            Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, Set<String>> entry = it.next();
                String clusterName = entry.getKey();
                Set<String> brokerNames = entry.getValue();
                boolean removed = brokerNames.remove(brokerNameFound);
                if (removed) {
                    log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                        brokerNameFound, clusterName);

                    if (brokerNames.isEmpty()) {
                        log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                            clusterName);
                        it.remove();
                    }

                    break; }}}/ / 4.
        if (removeBrokerName) {
            Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
                this.topicQueueTable.entrySet().iterator();
            while (itTopicQueueTable.hasNext()) {
                Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
                String topic = entry.getKey();
                List<QueueData> queueDataList = entry.getValue();

                Iterator<QueueData> itQueueData = queueDataList.iterator();
                while (itQueueData.hasNext()) {
                    QueueData queueData = itQueueData.next();
                    if (queueData.getBrokerName().equals(brokerNameFound)) {
                        itQueueData.remove();
                        log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData); }}if (queueDataList.isEmpty()) {
                    itTopicQueueTable.remove();
                    log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic); }}}}finally {
        / / 5.
        this.lock.writeLock().unlock();
    }
Copy the code

Code 1, lock, remove code 2 from brokerLiveTable and filterServerTable, get the corresponding brokerData, remove address from brokerAddrs of brokerData the address of the broker; If brokerAddrs are empty the corresponding brokerData code needs to be removed ③, get the corresponding brokerNames from clusterAddrTable and remove them; If brokerNames are empty, the cluster code for brokerName (④) needs to be removed from clusterAddrTable and the data code (⑤) for brokerName needs to be removed from topicQueueTable, freeing the lock

The code entry for the normal exit of the broker is that DefaultRequestProcessor calls the DefaultRequestProcessor#unregisterBroker method when the DefaultRequestProcessor receives RequestCode=104.

    public void unregisterBroker( final String clusterName, final String brokerAddr,
        final String brokerName, final long brokerId) {
        try {
            try {
                / / 1.
                this.lock.writeLock().lockInterruptibly();
                BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
                log.info("unregisterBroker, remove from brokerLiveTable {}, {}", brokerLiveInfo ! =null ? "OK" : "Failed",
                    brokerAddr
                );
                / / 2.
                this.filterServerTable.remove(brokerAddr);
                / / 3.
                boolean removeBrokerName = false;
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null! = brokerData) { String addr = brokerData.getBrokerAddrs().remove(brokerId); log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}", addr ! =null ? "OK" : "Failed",
                        brokerAddr
                    );

                    if (brokerData.getBrokerAddrs().isEmpty()) {
                        this.brokerAddrTable.remove(brokerName);
                        log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
                            brokerName
                        );

                        removeBrokerName = true; }}/ / 4.
                if (removeBrokerName) {
                    Set<String> nameSet = this.clusterAddrTable.get(clusterName);
                    if(nameSet ! =null) {
                        boolean removed = nameSet.remove(brokerName);
                        log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
                            removed ? "OK" : "Failed",
                            brokerName);

                        if (nameSet.isEmpty()) {
                            this.clusterAddrTable.remove(clusterName);
                            log.info("unregisterBroker, remove cluster from clusterAddrTable {}", clusterName ); }}this.removeTopicByBrokerName(brokerName); }}finally {
                / / 5.
                this.lock.writeLock().unlock(); }}/ /...
    }
Copy the code

Code ①, lock, remove broker live table data ②, remove filter service data ③, obtain the corresponding brokerData, and attempt to remove the address from brokerAddr. If brokerAddr is empty after removal, the entire broker is no longer in service code 4, remove the corresponding broker from the cluster. If the broker cluster is empty after removal, remove the cluster from code ⑤ to unlock it

Routing discovery

Namesrv does not actively notify clients when Topic routing changes. Therefore, the client needs to periodically pull and fetch new routing information. The client by calling a DefaultRequestProcessor# getTopicRouteInfoFromNameServer method to get the latest routing information.

    / / 1.
    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
    if(topicRouteData ! =null) {
        // whether to enable sequential consumption
        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
            String orderTopicConf =
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                    requestHeader.getTopic());
            topicRouteData.setOrderTopicConf(orderTopicConf);
        }
        / / 3.
        byte[] content = topicRouteData.encode();
        response.setBody(content);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
Copy the code

If the Topic routing information is not empty, the configuration parameter code (3) is obtained from the KvManager for “sequential consumption”. Finally, the information is backfilled into the request result

At the end of the article

In this article, the Namespace of RocketMQ will be discussed from the source code, which will provide some ideas for future wheel building.

end