Overview of RocketMQ architecture

1.1 Logical Deployment Diagram

(Photo from Internet)

1.2 Core Components

As can be seen from the figure above, RocketMQ consists of four core components: NameServer, Broker, Producer, and Consumer. The following four core components are briefly described:

NameServer: NameServer acts as the routing information provider. A producer or consumer can use NameServer to find a list of Broker IP addresses corresponding to each Topic. Multiple Namesrver instances form a cluster, but are independent of each other and do not exchange information.

Broker: A role that stores and forwards messages. The Broker server in the RocketMQ system is responsible for receiving and storing messages sent from producers and preparing consumers for pull requests. The Broker server also stores message-related metadata, including consumer groups, consumption progress offsets, and topic and queue messages.

Producer: Is responsible for producing messages. Generally, the business system is responsible for producing messages. A message producer sends messages generated in the business application to the Broker server. RocketMQ provides multiple delivery modes: synchronous, asynchronous, sequential, and unidirectional. Both synchronous and asynchronous require the Broker to return an acknowledgement message, but one-way does not.

Consumer: Is responsible for the consumption message, usually the backend system is responsible for asynchronous consumption. A message consumer pulls messages from the Broker server and provides them to the application. From the perspective of user application, it provides two forms of consumption: pull consumption and push consumption.

In addition to the three core components mentioned above, the concept of Topic will be mentioned several times below:

Topic: Represents a collection of a class of messages. Each Topic contains several messages, and each message can belong to only one Topic. It is RocketMQ’s basic unit for message subscription. A Topic can be sharded across multiple Broker clusters, and each Topic fragment contains multiple queues, as shown in the following figure:

1.3 Design Concept

RocketMQ is a topic-based publish and subscribe model. Its core functions include message sending, message storage, and message consumption. The overall design is simple and performance first.

  • NameServer replaces ZK as the registry. NameServer clusters do not communicate with each other and tolerate minute-based routing information inconsistencies within the cluster. NameServer is lightweight.

  • Memory mapping mechanism is used to achieve efficient IO storage and high throughput.

  • Tolerates design flaws by ensuring that the message is consumed at least once through an ACK, but if an ACK is lost, the message may be consumed repeatedly, which is allowed by design and left to the consumer.

This article focuses on NameServer. Let’s take a look at how NameServer is started and how routing is managed.

Second, NameServer architecture design

In Chapter 1, WE briefly introduced NameServer’s replacement of ZK as a more lightweight registry for routing information providers. So how to implement routing information management? Let’s start with the following image:

The figure above describes the core principles of NameServer for route registration, route culling, and route discovery.

Route registration: The Broker server sends heartbeat signals to all nameservers in the NameServer cluster when it is started, and sends heartbeat signals to the NameServer every 30 seconds to tell the NameServer that it is alive. When a NameServer receives a heartbeat packet from the Broker, it records the Broker’s information and saves the time when the last heartbeat packet was received.

Routing culling: NameServer maintains a long connection to each Broker, receives heartbeat packets from the Broker every 30 seconds, and scans BrokerLiveTable every 10 seconds to see if the last heartbeat received is greater than 120 seconds compared to the current time. If so, the Broker is considered unavailable. Remove information about the Broker from the routing table.

Route discovery: Route discovery is not real-time. After the route changes, NameServer does not actively push the route to the client and waits for the producer to pull the latest route information periodically. This approach reduces the complexity of NameServer implementation and ensures high availability of message sending by using a fault-tolerant mechanism on the sending side when the route changes. (This will be covered in a follow-up article on sending Producer messages.)

High availability: NameServer deploys multiple NameServer servers to ensure its high availability and does not communicate with each other. As a result, data among NameServer servers may not be identical when routing information changes. However, the fault tolerance mechanism of the sending end ensures the high availability of message sending. This is where NameServer aims for simplicity and efficiency.

3. Startup process

How does NameServer get started?

Since interpretation is the source code, so let’s look at the code entry: org. Apache. Rocketmq. Namesrv. NamesrvStartup# main (String [] args), is the actual call main0 () method,

The code is as follows:

public static NamesrvController main0(String[] args) {
​
    try {
        / / create namesrvController
        NamesrvController controller = createNamesrvController(args);
        // Initializes and starts NamesrvController
        start(controller);
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }
​
    return null;
}
Copy the code

Using the main method to start NameServer, there are two main steps: first create NamesrvController, then initialize and start NamesrvController. Let’s break it down.

3.1 a sequence diagram

Before we read the code in detail, let’s use a sequence diagram to get an idea of the overall process, as shown below:

3.2 create NamesrvController

Let’s start with the core code, as follows:

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
    // Set the version number to the current version
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    //PackageConflictDetect.detectFastjson();
  / / construct org.apache.com mons. Cli. The Options, and add - h - n parameters, -h parameter is to print help information, -n parameter is specified namesrvAddr
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    // Initialize the commandLine and add the -c -p parameter to options. -c specifies the nameserver configuration file path and -p specifies the configuration information to be printed
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
    if (null == commandLine) {
        System.exit(-1);
        return null;
    }
  // Nameserver configuration class, business parameters
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    // Netty server configuration class, network parameters
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    // Set the port number of nameserver
    nettyServerConfig.setListenPort(9876);
    If the -c parameter is specified in the // command, you need to read the content of the configuration file according to the path of the configuration file and assign the configuration information to NamesrvConfig and NettyServerConfig
    if (commandLine.hasOption('c')) {
        String file = commandLine.getOptionValue('c');
        if(file ! =null) {
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            // Reflection mode
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);
      // Set the configuration file path
            namesrvConfig.setConfigStorePath(file);
​
            System.out.printf("load config properties file OK, %s%n", file); in.close(); }}// The command line with -p indicates the command to print parameters, so the properties of NamesrvConfig and NettyServerConfig are printed. /mqnameserver -c configFile -p Prints the currently loaded configuration properties
    if (commandLine.hasOption('p')) {
        InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
        MixAll.printObjectProperties(console, namesrvConfig);
        MixAll.printObjectProperties(console, nettyServerConfig);
        // Print parameters command does not need to start the Nameserver service, just print parameters
        System.exit(0);
    }
  // Parse command line arguments and load them into namesrvConfig
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
  // Check ROCKETMQ_HOME, cannot be empty
    if (null == namesrvConfig.getRocketmqHome()) {
        System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }
  // Initialize the logback log factory. Rocketmq uses logback as log output by default
    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
    JoranConfigurator configurator = new JoranConfigurator();
    configurator.setContext(lc);
    lc.reset();
    configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
​
    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
​
    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);
  / / create NamesrvController
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
​
    / / copies the contents of the global Properties to NamesrvController Configuration. AllConfigs
    // remember all configs to prevent discard
    controller.getConfiguration().registerConfig(properties);
​
    return controller;
}
Copy the code

From the comments above for each line of code, you can see that the process of creating a NamesrvController is divided into two main steps:

Step1: obtain the configuration on the cli. Assign values to the NamesrvConfig and NettyServerConfig classes.

Step2: Construct an instance of NamesrvController according to the configuration classes NamesrvConfig and NettyServerConfig.

NamesrvConfig and NettyServerConfig are the service parameters of NameServer and network parameters of NettyServerConfig.

NamesrvConfig

NettyServerConfig

Apache Commons CLI is an open source command line parsing tool that helps developers quickly build startup commands and organize command arguments, output lists, etc.

3.3 Initialization and Startup

After the NamesrvController instance is created, initialize and start the NameServer.

The code entry is NamesrvController#initialize.

public boolean initialize(a) {
  // load the kvconfig. json configuration file under kvConfigPath and place these configurations in the KVConfigManager#configTable property
    this.kvConfigManager.load();
  // Initialize a Netty server according to nettyServerConfig.
    / / in NamesrvController brokerHousekeepingService instantiate the constructor when instantiated, this class is responsible for the Broker connect event processing, realized the ChannelEventListener, BrokerLiveTable, which is primarily used to manage RouteInfoManager
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
  // Initialize the thread pool responsible for processing Netty network interaction data. The default number of threads is 8
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
  / / registered Netty service end business processing logic, if open the clusterTest, then register the request processing class is ClusterTestRequestProcessor, otherwise the request processing class is DefaultRequestProcessor
    this.registerProcessor();
  // register a heartbeat thread pool with a 5-second delay for starting RouteInfoManager#brokerLiveTable property every 10 seconds to scan for non-living brokers
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
​
        @Override
        public void run(a) {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }},5.10, TimeUnit.SECONDS);
  // Register to print the kvConfig thread pool with a 1-minute start delay and print the kvConfig every 10 minutes
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
​
        @Override
        public void run(a) {
            NamesrvController.this.kvConfigManager.printAllPeriodically(); }},1.10, TimeUnit.MINUTES);
  // RocketMQ can improve the security of data transmission by enabling TLS. If enabled, a listener needs to be registered to reload SslContext
    if(TlsSystemConfig.tlsMode ! = TlsMode.DISABLED) {// Register a listener to reload SslContext
        try {
            fileWatchService = new FileWatchService(
                new String[] {
                    TlsSystemConfig.tlsServerCertPath,
                    TlsSystemConfig.tlsServerKeyPath,
                    TlsSystemConfig.tlsServerTrustCertPath
                },
                new FileWatchService.Listener() {
                    boolean certChanged, keyChanged = false;
                    @Override
                    public void onChanged(String path) {
                        if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                            log.info("The trust certificate changed, reload the ssl context");
                            reloadServerSslContext();
                        }
                        if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                            certChanged = true;
                        }
                        if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                            keyChanged = true;
                        }
                        if (certChanged && keyChanged) {
                            log.info("The certificate and private key changed, reload the ssl context");
                            certChanged = keyChanged = false; reloadServerSslContext(); }}private void reloadServerSslContext(a) { ((NettyRemotingServer) remotingServer).loadSslContext(); }}); }catch (Exception e) {
            log.warn("FileWatchService created error, can't load the certificate dynamically"); }}return true;
}
Copy the code

The above code is the NameServer initialization process. Through the comments of each line of code, it can be seen that there are five main steps:

  • Step1: load the KV configuration and write it to the configTable property of KVConfigManager;

  • Step2: Initialize the Netty server.

  • Step3: initialize the thread pool that processes netty network interaction data;

  • Step4: Register the heartbeat thread pool and check the survival of the Broker every 10 seconds after 5 seconds.

  • Step5: register the thread pool for printing KV configuration, and print KV configuration every 10 minutes after starting 1 minute.

A common programming trick used by the RocketMQ development team is to gracefully shut down NameServer using JVM hook functions. The shutdown operation is performed before the JVM process is shutdown.

Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
    @Override
    public Void call(a) throws Exception {
        controller.shutdown();
        return null; }}));Copy the code

Start NameServer by executing the start function. The code is relatively simple, which is to start the Netty Server created in the first step. The remotingServer.start() method is not used in netty. The remotingServer.start() method is used in netty.

public void start(a) throws Exception {
    // Start netty service
    this.remotingServer.start();
  // If TLS is enabled
    if (this.fileWatchService ! =null) {
        this.fileWatchService.start(); }}Copy the code

4. Route management

We learned at the beginning of Chapter 2 that NameServer is a lightweight registry that provides routing information for Topic producers and consumers, and manages routing information and Broker nodes, including route registration, route culling, and route discovery.

This chapter will analyze how NameServer manages routing information from the perspective of source code. The core code are mainly in org. Apache. Rocketmq. Namesrv. Routeinfo. RouteInfoManager implementation.

4.1 Routing meta Information

Before we learn about routing information management, we need to know what routing meta information NameServer stores and what the data structure is.

Looking at the code, we can see that routing meta-information is maintained mainly through five attributes, as follows:

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
Copy the code

Let’s expand on each of these five attributes in turn.

4.1.1 TopicQueueTable

Topic message queue routing information, messages are sent according to the routing table for load balancing.

Data structure: A HashMap structure where the key is the Topic name and the value is a collection of queues of type QueueData. As we saw in chapter 1, there are multiple queues in a Topic. QueueData’s data structure is as follows:

Example data structure:

topicQueueTable:{
    "topic1": [{"brokerName": "broker-a"."readQueueNums":4."writeQueueNums":4."perm":6."topicSynFlag":0}, {"brokerName": "broker-b"."readQueueNums":4."writeQueueNums":4."perm":6."topicSynFlag":0,}}]Copy the code

4.1.2 BrokerAddrTable

Description: Broker base information, including BrokerName, cluster name, and primary and standby Broker addresses.

Data structure: HashMap structure, key is BrokerName, value is an object of type BrokerData. BrokerData’s data structure is as follows (understood in conjunction with the Broker master-slave structure logic diagram below) :

Broker master/slave logical diagram:

Example data structure:

brokerAddrTable:{
    "broker-a": {
        "cluster": "c1"."brokerName": "broker-a"."brokerAddrs": {
            0: "192.168.1.1:10000".1: "192.168.1.2 instead: 10000"}},"broker-b": {
        "cluster": "c1"."brokerName": "broker-b"."brokerAddrs": {
            0: "192.168.1.3:10000".1: "192.168.1.4:10000"}}}Copy the code

4.1.3 ClusterAddrTable

Broker cluster information that stores the names of all brokers in the cluster.

Data structure: HashMap structure, key is ClusterName, value is Set structure that stores BrokerName.

Example data structure:

clusterAddrTable:{
    "c1": ["broker-a"."broker-b"]}Copy the code

4.1.4 BrokerLiveTable

Description: Broker status information. NameServer replaces this information every time it receives a heartbeat packet

Data structure: A HashMap structure where the key is the address of the Broker and the value is the Broker information object of the BrokerLiveInfo structure. BrokerLiveInfo’s data structure is as follows:

Example data structure:

brokerLiveTable:{
    "192.168.1.1:10000": {
            "lastUpdateTimestamp": 1518270318980."dataVersion":versionObj1,
            "channel":channelObj,
            "haServerAddr":""
    },
    "192.168.1.2 instead: 10000": {
            "lastUpdateTimestamp": 1518270318980."dataVersion":versionObj1,
            "channel":channelObj,
            "haServerAddr":"192.168.1.1:10000"
     },
    "192.168.1.3:10000": {
            "lastUpdateTimestamp": 1518270318980."dataVersion":versionObj1,
            "channel":channelObj,
            "haServerAddr":""
     },
    "192.168.1.4:10000": {
            "lastUpdateTimestamp": 1518270318980."dataVersion":versionObj1,
            "channel":channelObj,
            "haServerAddr":"192.168.1.3:10000"}}Copy the code

4.1.5 filterServerTable

FilterServer is a list of filterServers on the Broker. The Consumer is registered with the Broker by pulling data from the FilterServer.

Data structure: The HashMap structure, where the key is the Broker address and the value is the List of filterServer addresses.

4.2 Route Registration

Route registration is implemented through the heartbeat function between Broker and NameServer. There are two main steps:

Step1:

The Broker starts and sends heartbeat statements to all nameservers in the cluster every 30 seconds (default 30 seconds, interval between 10 and 60 seconds).

Step2:

NameServer receives a heartbeat package update topicQueueTable brokerAddrTable, brokerLiveTable, clusterAddrTable, filterServerTable.

Let’s analyze these two steps separately.

4.2.1 Broker sends heartbeat packets

Send a heartbeat packet core logic is in the Broker startup logic code entry is org. The apache. Rocketmq. Broker. BrokerController# start, this article focus on the logic of the heartbeat packets are sent, only list the core code, send a heartbeat packet is as follows:

1) create a thread pool registered Broker, execution start after 10 seconds, every 30 seconds (default 30 s, the time interval between 10 seconds to 60 seconds, BrokerConfig. GetRegisterNameServerPeriod () is the default value is 30 seconds) to perform again.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
​
    @Override
    public void run(a) {
        try {
            BrokerController.this.registerBrokerAll(true.false, brokerConfig.isForceRegister());
        } catch (Throwable e) {
            log.error("registerBrokerAll Exception", e); }}},1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
Copy the code

2) After encapsulating the Topic configuration and version number, the actual route registration takes place. The actual routing is registered in the org. Apache. Rocketmq. Broker. Out. BrokerOuterAPI# registerBrokerAll in implementation, the core code is as follows:

public List<RegisterBrokerResult> registerBrokerAll(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final boolean oneway,
    final int timeoutMills,
    final boolean compressed) {
​
    final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
    // Get nameserver address list
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if(nameServerAddressList ! =null && nameServerAddressList.size() > 0) {
    /** * start * Encapsulates the request header, which encapsulates information about the broker **/
        final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
        requestHeader.setBrokerAddr(brokerAddr);
        requestHeader.setBrokerId(brokerId);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setClusterName(clusterName);
        requestHeader.setHaServerAddr(haServerAddr);
        requestHeader.setCompressed(compressed);
    // Encapsulate the requestBody, including topic and filterServerList information
        RegisterBrokerBody requestBody = new RegisterBrokerBody();
        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
        requestBody.setFilterServerList(filterServerList);
        final byte[] body = requestBody.encode(compressed);
        final int bodyCrc32 = UtilAll.crc32(body);
        requestHeader.setBodyCrc32(bodyCrc32);
        /** * encapsulates the request header end **/
        // Enable multithreading to register with each nameserver
        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
        for (final String namesrvAddr : nameServerAddressList) {
            brokerOuterExecutor.execute(new Runnable() {
                @Override
                public void run(a) {
                    try {
                        // Actually register the method
                        RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                        if(result ! =null) {
                            Encapsulate the information returned by nameserver
                            registerBrokerResultList.add(result);
                        }
​
                        log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                    } catch (Exception e) {
                        log.warn("registerBroker Exception, {}", namesrvAddr, e);
                    } finally{ countDownLatch.countDown(); }}}); }try {
            countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }
    }
​
    return registerBrokerResultList;
}
Copy the code

From the above code, also relatively simple, first need to wrap the request header and requestBody, and then enable multi-threading to each NameServer server to register.

Request header types for RegisterBrokerRequestHeader, mainly including the following fields:

RequestBody type is RegisterBrokerBody and contains the following fields:

1) The actual route registration is implemented through registerBroker method, the core code is as follows:

private RegisterBrokerResult registerBroker(
    final String namesrvAddr,
    final boolean oneway,
    final int timeoutMills,
    final RegisterBrokerRequestHeader requestHeader,
    final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
    // To create a request, note that the network processor on the nameserver side of the RequestCode.REGISTER_BROKER will process the corresponding business according to the RequestCode
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
    request.setBody(body);
  // Network transmission based on Netty
    if (oneway) {
        // If the call is one-way, no value is returned, nameserver does not return the result
        try {
            this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
        } catch (RemotingTooMuchRequestException e) {
            // Ignore
        }
        return null;
    }
  // The asynchronous call initiates registration with nameserver to get the return information from nameserver
    RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
    assertresponse ! =null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            // Get the returned reponseHeader
            RegisterBrokerResponseHeader responseHeader =
                (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
            // Rewrap the returned result, updating masterAddr and haServerAddr
            RegisterBrokerResult result = new RegisterBrokerResult();
            result.setMasterAddr(responseHeader.getMasterAddr());
            result.setHaServerAddr(responseHeader.getHaServerAddr());
            if(response.getBody() ! =null) {
                result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
            }
            return result;
        }
        default:
            break;
    }
​
    throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}
Copy the code

Borker and NameServer are transmitted over netty. When a Broker initiates a registration request to NameServer, it adds the registration code requestcode.register_broker to the request. This is a network tracing method. RocketMQ defines a requestCode for each request, and the network processor on the server will process the affected business according to the different requestCode.

4.2.2 NameServer Processing Heartbeat Packets

After the Broker sends a registered heartbeat packet, NameServer processes it according to the requestCode in the heartbeat packet. The default network processor for NameServer is DefaultRequestProcessor.

public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    if(ctx ! =null) {
        log.debug("receive request, {} {} {}",
                  request.getCode(),
                  RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                  request);
    }
    switch (request.getCode()) {
        ......
        // If it is requestcode. REGISTER_BROKER, register the broker
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
                return this.registerBroker(ctx, request); }...default:
            break;
    }
    return null;
}
Copy the code

Determine the requestCode and, if it is requestcode.register_broker, determine that the business processing logic is the registered Broker. According to the different methods of Broker version number choice, we have more than V3_0_11, for example, call registerBrokerWithFilterServer method to register the main steps are divided into three steps:

Step1:

Analyze the requestHeader and check it (based on CRC32) to determine whether the data is correct;

Step2:

A. Topic B.

Step3:

Call RouteInfoManager#registerBroker to register the Broker;

The core registration logic is implemented by RouteInfoManager#registerBroker. The core code is as follows:

public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            // Add a write lock to prevent concurrent writing of routing table information in the RoutInfoManager.
            this.lock.writeLock().lockInterruptibly();
      // Get all broker names from clusterAddrTable based on clusterName
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            // brokerName needs to be created and added to the broker collection for the cluster if it is not recorded
            if (null == brokerNames) {
                brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            brokerNames.add(brokerName);
      
            boolean registerFirst = false;
      // brokerData is attempted from brokerAddrTable according to brokerName
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                // If brokerData is not fetched, create a new brokerData and place brokerAddrTable with registerFirst set to true;
                registerFirst = true;
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                this.brokerAddrTable.put(brokerName, brokerData);
            }
            // Update brokerAddrs in brokerData
            Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
            // brokerId will be 0 in case the master hangs and the slave becomes master, and the old brokerAddr needs to be removed
            //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
            //The same IP:PORT must only have one record in brokerAddrTable
            Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
            while (it.hasNext()) {
                Entry<Long, String> item = it.next();
                if (null! = brokerAddr && brokerAddr.equals(item.getValue()) && brokerId ! = item.getKey()) { it.remove(); }}// brokerAddrs are updated to determine if the broker is registered for the first time based on the oldAddr returned
            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);
​
            // If the Broker is Master and the Topic configuration of the Broker changes or is registered for the first time, the Topic routing metadata needs to be created or updated to populate the topicQueueTable
            if (null! = topicConfigWrapper && MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                    || registerFirst) {
                    ConcurrentMap<String, TopicConfig> tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                    if(tcTable ! =null) {
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                            // Create or update Topic routing metadata
                            this.createAndUpdateQueueData(brokerName, entry.getValue()); }}}}/ / update BrokerLivelnfo BrokeLivelnfo is perform routing delete important basis
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                                                                         new BrokerLiveInfo(
                                                                             System.currentTimeMillis(),
                                                                             topicConfigWrapper.getDataVersion(),
                                                                             channel,
                                                                             haServerAddr));
            if (null == prevBrokerLiveInfo) {
                log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
            }
      // Register the Broker's filterServer address list
            if(filterServerList ! =null) {
                if (filterServerList.isEmpty()) {
                    this.filterServerTable.remove(brokerAddr);
                } else {
                    this.filterServerTable.put(brokerAddr, filterServerList); }}// If the Broker is a slave node, look up the node information of the Broker Master and update the corresponding masterAddr attribute
            if(MixAll.MASTER_ID ! = brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if(masterAddr ! =null) {
                    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                    if(brokerLiveInfo ! =null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); }}}}finally {
            this.lock.writeLock().unlock(); }}catch (Exception e) {
        log.error("registerBroker Exception", e);
    }
​
    return result;
}
Copy the code

From the above source code analysis, we can break down the registration of a Broker into seven main steps:

  • Step1: Add write lock to prevent concurrent writing of routing table information in RoutInfoManager;

  • Step2: check whether the cluster of brokers exists. If not, add the Broker name to the cluster.

  • Step3: Maintain BrokerData;

  • Step4: if the Broker is Master and the Topic configuration information of the Broker changes or is registered for the first time, create or update the Topic routing metadata and populate the TopicQueueTable.

  • Step5: Update BrokerLivelnfo;

  • Step6: register the Broker’s filterServer address list.

  • Step7: if the Broker is a slave node, look for the node information of the Broker Master, update the corresponding masterAddr property, and return it to the Broker end.

4.3 Route Deletion

4.3.1 Trigger conditions

There are two trigger conditions for route deletion:

NameServer scans BrokerLiveTable every 10s and if no heartbeat packet is received for 120s, removes the Broker and closes the socket connection.

Route deletion is triggered when the Broker shuts down.

4.3.2 Source code analysis

The logic of the trigger points described above is the same as that of RouteInfoManager#onChannelDestroy

The core code is as follows:

public void onChannelDestroy(String remoteAddr, Channel channel) {
    String brokerAddrFound = null;
    if(channel ! =null) {
        try {
            try {
                / / read lock
                this.lock.readLock().lockInterruptibly();
                // Find the corresponding Broker address from brokerLiveTable via a channel
                Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
                    this.brokerLiveTable.entrySet().iterator();
                while (itBrokerLiveTable.hasNext()) {
                    Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
                    if (entry.getValue().getChannel() == channel) {
                        brokerAddrFound = entry.getKey();
                        break; }}}finally {
                // Release the read lock
                this.lock.readLock().unlock(); }}catch (Exception e) {
            log.error("onChannelDestroy Exception", e); }}If the Broker has been removed from the list of living Broker addresses, use remoteAddr directly
    if (null == brokerAddrFound) {
        brokerAddrFound = remoteAddr;
    } else {
        log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
    }
​
    if(brokerAddrFound ! =null && brokerAddrFound.length() > 0) {
​
        try {
            try {
                // Apply write lock
                this.lock.writeLock().lockInterruptibly();
                // According to brokerAddress, remove this brokerAddress from brokerLiveTable and filterServerTable
                this.brokerLiveTable.remove(brokerAddrFound);
                this.filterServerTable.remove(brokerAddrFound);
                String brokerNameFound = null;
                boolean removeBrokerName = false;
                Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                    this.brokerAddrTable.entrySet().iterator();
                / / traverse brokerAddrTable
                while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
                    BrokerData brokerData = itBrokerAddrTable.next().getValue();
​
                    Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<Long, String> entry = it.next();
                        Long brokerId = entry.getKey();
                        String brokerAddr = entry.getValue();
                        // Find the corresponding brokerData according to brokerAddress and remove the corresponding brokerAddress from brokerData
                        if (brokerAddr.equals(brokerAddrFound)) {
                            brokerNameFound = brokerData.getBrokerName();
                            it.remove();
                            log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                                     brokerId, brokerAddr);
                            break; }}// If the brokerAddress for the entire brokerData is empty after removal, the entire brokerData is removed
                    if (brokerData.getBrokerAddrs().isEmpty()) {
                        removeBrokerName = true;
                        itBrokerAddrTable.remove();
                        log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); }}if(brokerNameFound ! =null && removeBrokerName) {
                    / / traverse clusterAddrTable
                    Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<String, Set<String>> entry = it.next();
                        String clusterName = entry.getKey();
                        Set<String> brokerNames = entry.getValue();
                        // Remove the brokerName that needs to be removed according to the brokerName retrieved in Step 3
                        boolean removed = brokerNames.remove(brokerNameFound);
                        if (removed) {
                            log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                                     brokerNameFound, clusterName);
              // If the set is empty, remove the entire cluster from the clusterAddrTable
                            if (brokerNames.isEmpty()) {
                                log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                                         clusterName);
                                it.remove();
                            }
​
                            break; }}}if (removeBrokerName) {
                    Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
                        this.topicQueueTable.entrySet().iterator();
                    / / traverse topicQueueTable
                    while (itTopicQueueTable.hasNext()) {
                        Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
                        String topic = entry.getKey();
                        List<QueueData> queueDataList = entry.getValue();
​
                        Iterator<QueueData> itQueueData = queueDataList.iterator();
                        while (itQueueData.hasNext()) {
                            QueueData queueData = itQueueData.next();
                            // Remove the corresponding broker under Topic according to brokerName
                            if (queueData.getBrokerName().equals(brokerNameFound)) {
                                itQueueData.remove();
                                log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData); }}// If there is only one broker under the topic to be removed, then the topic is removed from the table
                        if (queueDataList.isEmpty()) {
                            itTopicQueueTable.remove();
                            log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic); }}}}finally {
                // Release the write lock
                this.lock.writeLock().unlock(); }}catch (Exception e) {
            log.error("onChannelDestroy Exception", e); }}}Copy the code

The overall route deletion logic is divided into six steps:

  • Step1: add readlock, find the corresponding Broker address from BrokerLiveTable via channel, release the readlock, and use remoteAddr directly if the Broker has been cleared from the list of living Broker addresses.

  • Step2: apply write lock, remove BrokerLiveTable, filterServerTable according to BrokerAddress.

  • Step3: Go through BrokerAddrTable, find the corresponding brokerData according to BrokerAddress, remove the corresponding BrokerAddress in brokerData, and if the entire brokerData BrokerAddress is empty after BrokerAddress is removed, Then remove the entire brokerData.

  • Step4: walk through the clusterAddrTable and remove the brokernames to be removed based on the brokernames retrieved in step 3. If the set is empty after removal, the entire cluster is removed from the clusterAddrTable.

  • Step5: Traverse TopicQueueTable and remove the corresponding Broker under the Topic according to BrokerName. If there is only one Broker under the Topic to be removed, then the Topic is removed from the table.

  • Step6: release the write lock.

As can be seen from the above, the overall logic of routing elimination is relatively simple, that is, it simply operates against the data structure of routing meta-information. To get a better understanding of this code, it is recommended that you read the code by referring to the data structure of the routing meta-information described in 4.1.

4.4 Route Discovery

When routing information changes, NameServer does not actively push routing information to the client. Instead, NameServer waits for the client to periodically pull the latest routing information from NameServer. This design approach reduces the complexity of NameServer implementation.

4.4.1 Producer actively pulls

After being started, producer starts a series of scheduled tasks, including getting Topic routing information from NameServer on a regular basis. The code entry is MQClientInstance# start-scheduledTask (), and the core code is as follows:

private void startScheduledTask(a) {...this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
​
        @Override
        public void run(a) {
            try {
                // Update topic routing information from nameserver
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); }}},10.this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); . }/** * Get topic routing information from nameserver */
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
                                                      boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {...// Send a request to nameserver with requestCode as requestcode.get_routeInfo_by_topicRemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader); . }Copy the code

The producer and NameServer communicate over netTY, and the producer adds registration codes to requests initiated by NameServer

RequestCode. GET_ROUTEINFO_BY_TOPIC.

4.4.2 NameServer Returns routing information

After receiving a request from the Producer, the NameServer processes the request based on the requestCode contained in the request. RequestCode processing is also done in the default network processor DefaultRequestProcessor, and is eventually done through RouteInfoManager#pickupTopicRouteData.

TopicRouteData structure

Before formally parsing the source code, let’s take a look at the data structure that NameServer returns to the producer. TopicRouteData = TopicRouteData = TopicRouteData

QueueData, BrokerData, and filterServerTable are described in section 4.1 when the routing meta information is introduced.

Source code analysis

Now that we know the TopicRouteData structure returned to the producer, we go to the RouteInfoManager#pickupTopicRouteData method to see how to implement it.

public TopicRouteData pickupTopicRouteData(final String topic) {
    TopicRouteData topicRouteData = new TopicRouteData();
    boolean foundQueueData = false;
    boolean foundBrokerData = false;
    Set<String> brokerNameSet = new HashSet<String>();
    List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
    topicRouteData.setBrokerDatas(brokerDataList);
​
    HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
    topicRouteData.setFilterServerTable(filterServerMap);
​
    try {
        try {
            / / read lock
            this.lock.readLock().lockInterruptibly();
            // Get the collection of queues based on the topic name from the topicQueueTable metadata
            List<QueueData> queueDataList = this.topicQueueTable.get(topic);
            if(queueDataList ! =null) {
                // Write the fetched set of queues to topicRouteData's queueDatas
                topicRouteData.setQueueDatas(queueDataList);
                foundQueueData = true;
​
                Iterator<QueueData> it = queueDataList.iterator();
                while (it.hasNext()) {
                    QueueData qd = it.next();
                    brokerNameSet.add(qd.getBrokerName());
                }
        // Iterate over the brokerName extracted from the QueueData collection
                for (String brokerName : brokerNameSet) {
                    // Fetch brokerData from brokerAddrTable according to brokerName
                    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                    if (null! = brokerData) {// Clone the brokerData object and write to topicRouteData's brokerDatas
                        BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());
                        brokerDataList.add(brokerDataClone);
                        foundBrokerData = true;
                        / / traverse brokerAddrs
                        for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                            // Fetch the filterServerList according to brokerAddr, wrap it and write it to topicRouteData's filterServerTable
                            List<String> filterServerList = this.filterServerTable.get(brokerAddr);
                            filterServerMap.put(brokerAddr, filterServerList);
                        }
                    }
                }
            }
        } finally {
            // Release the read lock
            this.lock.readLock().unlock(); }}catch (Exception e) {
        log.error("pickupTopicRouteData Exception", e);
    }
​
    log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
​
    if (foundBrokerData && foundQueueData) {
        return topicRouteData;
    }
​
    return null;
}
Copy the code

QueueDatas, BrokerDatas, and filterServerTable for TopicRouteData are wrapped. The orderTopicConf field is not wrapped. We look up at the call method DefaultRequestProcessor#getRouteInfoByTopic for RouteInfoManager#pickupTopicRouteData as follows:

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {...// This is the code parsed above to get the topicRouteData object
    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
​
    if(topicRouteData ! =null) {
        // Check whether the orderMessageEnable configuration of Nameserver is enabled
        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
            // If the configuration is enabled, get the sequential message configuration content in the kvConfig configuration file based on the namespace and topic names
            String orderTopicConf =
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                                                                        requestHeader.getTopic());
            / / encapsulation orderTopicConf
            topicRouteData.setOrderTopicConf(orderTopicConf);
        }
​
        byte[] content = topicRouteData.encode();
        response.setBody(content);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
  // If no topic route is obtained, the reponseCode is TOPIC_NOT_EXIST
    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
    response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
                       + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
    return response;
}
Copy the code

Combining these two methods, we can conclude that the search Topic routing is divided into three steps:

Call RouteInfoManager# pickupTopicRouteData from topicQueueTable brokerAddrTabl, filterServerTable in access to information, Fill queue-Datas, BrokerDatas, filterServerTable, respectively.

If topic is a sequential message, then the configuration about the sequential message precedents is taken from KVconfig and populated into orderTopicConf.

If no routing information is found, return code as responsecode.topic_not_exist.

Five, the summary

This article introduces RocketMQ NameServer from a source code perspective, including the startup process of NameServer, route registration, route culling, and route discovery. After we know the design principle of NameServer, we can also go back to think about some tips worth learning in the design process. Here I put forward two points:

  • The startup process registers JVM hooks for graceful downtime. This is a programming trick that can be used to register JVM hooks to release resources or do something before the JVM shuts down to ensure elegant downtime when using thread pools or resident thread tasks.

  • Update routing table when the need to lock to prevent concurrent operation, the use of locking granularity is less read-write lock, allows multiple message sender concurrent read and ensure the high concurrency when the message is sent, but the same time NameServer deals only with a Broker heartbeat packets, multiple heartbeat package request serial implementation, it is also a read-write lock the classic usage scenarios.

Vi. Reference materials

1. Inside RocketMQ Technology

RocketMQ Core Principles and Practices

Apache RocketMQ Developer’s Guide

Author: Ye Wenhao, Vivo Internet Server Team