Note: This series of source code analysis is based on RocketMq 4.8.0, gitee Repository link: gitee.com/funcy/rocke… .

In the last article, we looked at the startup process of NameServer, which ended up with NameServer starting a Netty service. In this article, we’ll look at how the Netty service handles requests.

1. Processing service requestsChannelHandler: serverHandler

The NamesrvController can handle Broker/Producer/Consumer requests. The serverHandler is the ChannelHandler that handles these requests. Is NettyRemotingServer.Net tyServerHandler (NettyServerHandler NettyRemotingServer inner classes), the code is as follows:

class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) 
            throws Exception { processMessageReceived(ctx, msg); }}Copy the code

NettyRemotingAbstract#processMessageReceived

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) 
        throws Exception {
    final RemotingCommand cmd = msg;
    if(cmd ! =null) {
        switch (cmd.getType()) {
            case REQUEST_COMMAND:
                processRequestCommand(ctx, cmd);
                break;
            case RESPONSE_COMMAND:
                processResponseCommand(ctx, cmd);
                break;
            default:
                break; }}}Copy the code

REQUEST_COMMAND NettyRemotingAbstract#processRequestCommand

public void processRequestCommand(final ChannelHandlerContext ctx, 
        final RemotingCommand cmd) {
    final Pair<NettyRequestProcessor, ExecutorService> matched 
        = this.processorTable.get(cmd.getCode());
    final Pair<NettyRequestProcessor, ExecutorService> pair = 
        null == matched ? this.defaultRequestProcessor : matched;
    final int opaque = cmd.getOpaque();

    if(pair ! =null) {
        Runnable run = new Runnable() {
            @Override
            public void run(a) {
                try{ doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); .if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
                        AsyncNettyRequestProcessor processor = 
                                (AsyncNettyRequestProcessor)pair.getObject1();
                        processor.asyncProcessRequest(ctx, cmd, callback);
                    } else {
                        NettyRequestProcessor processor = pair.getObject1();
                        // Process the requestRemotingCommand response = processor.processRequest(ctx, cmd); callback.callback(response); }}catch(Throwable e) { ... }}}; .try {
            // Asynchronous processing
            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
            pair.getObject2().submit(requestTask);
        } catch(RejectedExecutionException e) { ... }}else{... }}Copy the code

The main process of this method is to get the Pair, encapsulate the processing operation as a Runnable object, and then commit the Runnable object to the thread pool.

What is this Pair object? Remember the remotingExecutor we created in the NamesrvController#initialize method, which was eventually registered with the defaultRequestProcessor property for NettyRemotingServer:

@Override
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
    this.defaultRequestProcessor 
            = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
}
Copy the code

The thread pool from the defaultRequestProcessor, pain.getobject2 (), is remotingExecutor. The resulting processor for pg.getobject1 () is the DefaultRequestProcessor.

The remotingExecutor thread pool is used to process remote requests.

The processing logic for the remote command is in the Runnable#run method:

public void run(a) {
    try{ doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); .// Process the request
        if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
            AsyncNettyRequestProcessor processor = 
                    (AsyncNettyRequestProcessor)pair.getObject1();
            processor.asyncProcessRequest(ctx, cmd, callback);
        } else {
            NettyRequestProcessor processor = pair.getObject1();
            // Process the requestRemotingCommand response = processor.processRequest(ctx, cmd); callback.callback(response); }}catch(Throwable e) { ... }}Copy the code

The code distinguishes between synchronous and asynchronous requests and actually ends up in the DefaultRequestProcessor#processRequest method:

public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {...switch (request.getCode()) {
        ...
        / / query dataVersion
        case RequestCode.QUERY_DATA_VERSION:
            return queryBrokerTopicConfig(ctx, request);
        // Register broker messages
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
                return this.registerBroker(ctx, request);
            }
        // Unregister the broker when it is disconnected
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);
        // Get routing information based on topic
        case RequestCode.GET_ROUTEINFO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);
        // Omit processing of other messages.default:
            break;
    }
    return null;
}
Copy the code

This method is used to process network requests, processing request messages will be more, here we only focus on the following types of messages:

  • To obtainbrokerVersion information: RequestcodeforQUERY_DATA_VERSION, used to querybrokerVersion information of
  • brokerRegister: requestcodeforREGISTER_BROKER.brokerWhen it starts, it registers its own information tonameServer
  • brokerLogout: requestcodeforUNREGISTER_BROKER.brokerBefore it stops, it sends a messagenameServerI’m going to close
  • To obtaintopicRouting information: Based ontopicGet the routing informationtopicThe correspondingbroker,messageQueueinformation

Let’s take a look at the implementation of these messages.

2 brokerRegister and deregister messages

When the broker starts, a registration message is sent to the NamerServer. Before the broker closes, a closure message is sent to NameServer.

We first see registration message, broker register message method for DefaultRequestProcessor# registerBrokerWithFilterServer, we directly see important code:

public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {...// Process registration
    RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker( requestHeader.getClusterName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerName(), requestHeader.getBrokerId(), requestHeader.getHaServerAddr(), registerBrokerBody.getTopicConfigSerializeWrapper(), registerBrokerBody.getFilterServerList(), ctx.channel()); . }Copy the code

RouteInfoManager#registerBroker method is called. Before we analyze this method, we will first understand the basic information of RouteInfoManager. Several important member variables of RouteInfoManager are as follows:

public class RouteInfoManager {
    /** topic -> List<QueueData> */
    private final HashMap<String, List<QueueData>> topicQueueTable;
    /** brokerName -> BrokerData */
    private final HashMap<String, BrokerData> brokerAddrTable;
    /** clusterName -> brokerName */
    private final HashMap<String, Set<String>> clusterAddrTable;
    /** brokerAddr -> BrokerLiveInfo */
    private final HashMap<String, BrokerLiveInfo> brokerLiveTable;

    // omit other methods. }Copy the code

As mentioned, NameServer is a very simple Topic routing registry, and this HashMap is the key to implementing the Registry for NameServer!

  1. TopicQueueTable: holds the relationship between topics and queues. The value type is List, indicating that a topic can have multiple queues. QueueData member variables are as follows:

    public class QueueData implements Comparable<QueueData> {
     // The name of the borker
     private String brokerName;
     / / to read and write
     private int readQueueNums;
     private int writeQueueNums;
     private int perm;
     private inttopicSynFlag; . }Copy the code
  2. BrokerAddrTable: Records broker details, key is the broker name, value is the broker details, BrokerData member variables are as follows:

    public class BrokerData implements Comparable<BrokerData> {
     // Name of the cluster
     private String cluster;
     / / the name of the broker
     private String brokerName;
     // The server address for borkerId. A brokerName can have multiple Broker servers
     private HashMap<Long, String> brokerAddrs;
    }
    Copy the code
  3. ClusterAddrTable: Cluster information that saves the brokerName for the cluster name

  4. BrokerLiveTable: Live broker information, key for broker address, value for specific broker server, BrokerLiveInfo member variables as follows:

    class BrokerLiveInfo {
     // Last heartbeat update time
     private long lastUpdateTimestamp;
     private DataVersion dataVersion;
     // Indicates the network connection channel, provided by Netty
     private Channel channel;
     // High availability service address
     privateString haServerAddr; . }Copy the code

With this in mind, we can look back at the RouteInfoManager#registerBroker method and see that registration is simply putting data into the above hashmaps:

public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            this.lock.writeLock().lockInterruptibly();
            // Get brokerNames according to clusterName
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            if (null == brokerNames) {
                brokerNames = new HashSet<String>();
                // Pay attention to the put operation, which operates on clusterAddrTable
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            brokerNames.add(brokerName);

            boolean registerFirst = false;
            // Get the specified brokerData according to brokerName
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                registerFirst = true;
                brokerData = new BrokerData(clusterName, 
                    brokerName, new HashMap<Long, String>());
                // brokerAddrTable is brokerAddrTable
                this.brokerAddrTable.put(brokerName, brokerData);
            }
            Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
            // Delete the original records of the secondary node if the primary node is switched from the secondary node
            Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
            while (it.hasNext()) {
                Entry<Long, String> item = it.next();
                if (null! = brokerAddr && brokerAddr.equals(item.getValue()) && brokerId ! = item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr);

            if (null! = topicConfigWrapper && MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, 
                        topicConfigWrapper.getDataVersion()) || registerFirst) {
                    ConcurrentMap<String, TopicConfig> tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                    if(tcTable ! =null) {
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                            // Add topic, which operates on topicQueueTable
                            this.createAndUpdateQueueData(brokerName, entry.getValue()); }}}}// Add the live broker to the brokerLiveTable structure and operate on brokerLiveTable
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                newBrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); . }finally {
            this.lock.writeLock().unlock(); }}catch (Exception e) {
        log.error("registerBroker Exception", e);
    }

    return result;
}
Copy the code

This makes it clear what this method does: wrap the information reported by the broker into these hashmaps.

The logout operation is the opposite of the registration operation. It removes the broker information from the hashMap using RouteInfoManager#unregisterBroker. The code does remove the hashMap, so I won’t analyze it here.

3 according totopicObtaining Routing Information

When a producer or consumer starts, it needs to get routing information from NameServer based on topic. DefaultRequestProcessor#getRouteInfoByTopic processes the message

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request
            .decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

    // Get topic information
    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager()
            .pickupTopicRouteData(requestHeader.getTopic());
    // omit others. }Copy the code

This method calls the RouteInfoManager#pickupTopicRouteData method, again RouteInfoManager, and here we can imagine getting the topic routing information, RouteInfoManager HashMap: RouteInfoManager HashMap

 public TopicRouteData pickupTopicRouteData(final String topic) {...try {
            try {
                this.lock.readLock().lockInterruptibly();
                // topicQueueTable
                List<QueueData> queueDataList = this.topicQueueTable.get(topic);
                if(queueDataList ! =null) {... }}finally {
                this.lock.readLock().unlock(); }}catch(Exception e) { ... }... }Copy the code

TopicQueueTable = topicQueueTable = topicQueueTable = topicQueueTable = topicQueueTable = topicQueueTable = topicQueueTable = topicQueueTable = topicQueueTable;

Query 4.brokerVersion information

Before the broker registers with nameServer, it sends a message with the QUERY_DATA_VERSION code to determine whether the version number has changed. The code for this message is as follows:

public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {...switch (request.getCode()) {
        ...
        / / query dataVersion
        case RequestCode.QUERY_DATA_VERSION:
            returnqueryBrokerTopicConfig(ctx, request); . . }Copy the code

Enter the DefaultRequestProcessor#queryBrokerTopicConfig method as follows:

public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {...// Key code: determine whether the version has changed
    Boolean changed = this.namesrvController.getRouteInfoManager()
        .isBrokerTopicConfigChanged(requestHeader.getBrokerAddr(), dataVersion);
    if(! changed) {// If there is no change, update the last report time to the current time
        this.namesrvController.getRouteInfoManager()
            .updateBrokerInfoUpdateTimestamp(requestHeader.getBrokerAddr());
    }

    DataVersion nameSeverDataVersion = this.namesrvController.getRouteInfoManager().
            queryBrokerTopicConfig(requestHeader.getBrokerAddr());
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);

    // Returns the current version of nameServer
    if(nameSeverDataVersion ! =null) {
        response.setBody(nameSeverDataVersion.encode());
    }
    responseHeader.setChanged(changed);
    return response;
}
Copy the code

This method consists of three operations:

1. The judgebrokerWhether the version has changed

This part is to determine whether the reported version number is consistent with the version number saved by NameServer:

public boolean isBrokerTopicConfigChanged(final String brokerAddr, 
        final DataVersion dataVersion) {
    // Continue the query
    DataVersion prev = queryBrokerTopicConfig(brokerAddr);
    return null== prev || ! prev.equals(dataVersion); }Copy the code

This method is the judgment logic, just a simple equals operation. Continue with the DatSpanning query:

public DataVersion queryBrokerTopicConfig(final String brokerAddr) {
    BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
    if(prev ! =null) {
        return prev.getDataVersion();
    }
    return null;
}
Copy the code

Again, the RouteInfoManager hashMap operation, which ends up retrieving the version saved by NameServer from brokerLiveTable.

2. Perform operations when the version is changed

If the version has not changed, update the current time to the latest reported time.

public void updateBrokerInfoUpdateTimestamp(final String brokerAddr) {
    BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
    if(prev ! =null) { prev.setLastUpdateTimestamp(System.currentTimeMillis()); }}Copy the code

Again on the RouteInfoManager hashmaps, note that the value of the BrokerLiveInfo#lastUpdateTimestamp member variable is updated to the current time only when DataVersion has not changed.

So when DataVersion changes, the value of BrokerLiveInfo#lastUpdateTimestamp is not updated? No, if datuncomfortable changes, BrokerLiveInfo#lastUpdateTimestamp will be changed in the registration request.

3. Query the current version

To query the current version number, use RouteInfoManager#queryBrokerTopicConfig, in the 1. It is used to determine whether the broker version has changed and will not be covered here.

5. Scheduled task: CheckbrokerWhether to live

In the previous analysis of NamesrvController#initialize, we mentioned that this method starts a timed task:

// Enable a scheduled task to scan brokers every 10 seconds to remove inactive brokers
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run(a) {
        NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }},5.10, TimeUnit.SECONDS);
Copy the code

It calls the method RouteInfoManager#scanNotActiveBroker as follows:

public void scanNotActiveBroker(a) {
    // brokerLiveTable: brokerLiveTable is used to hold active brokers, which are found inactive and removed
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        // Last heartbeat time
        long last = next.getValue().getLastUpdateTimestamp();
        // Determine whether to survive according to the heartbeat time. The timeout period is 2 minutes
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            RemotingUtil.closeChannel(next.getValue().getChannel());
            / / remove
            it.remove();
            // Handle channel closure. This method handles removal of other Hashmaps
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); }}}Copy the code

This method first traverses brokerLiveTable, then determines the last time each BrokerLiveInfo was reported, determines if it timed out, and if the last time BrokerLiveInfo was reported is more than 2 minutes away from the current brokerLiveTable, then removes it from brokerLiveTable. The RouteInfoManager#onChannelDestroy method is then called to remove brokers from other hashmaps.

6. Summary

This paper analyzes the processing of request messages by NameServer. NameServer uses Netty to communicate. The ChannelHandler for processing broker, producer and consumer request messages is NettyServerHandler. The final processing method is DefaultRequestProcessor#processRequest, which will handle many requests. We focus on the process of registering/unregistering broker messages, obtaining topic routing messages, and obtaining broker version information.

Registering/unregistering broker messages, getting topic routing messages, and getting broker version information are all ultimately handled in the RouteInfoManager class, which has several very important member variables of type HashMap as follows:

  1. topicQueueTable: Store and savetopicwithQueueThe relationship between,valueA type ofList, indicating atopicYou can have more than onequeue
  2. brokerAddrTableRecord:brokerSpecific information about,keyforbrokerThe name,valueforbrokerSpecific information
  3. clusterAddrTable: Indicates the cluster information. Save the information corresponding to the cluster namebrokerName
  4. brokerLiveTable: livebrokerInformation,keyforbrokerThe address,valueFor a specificbrokerThe server

These member variables are why NameServer is called a registry. To register/unregister brokers, you put or remove the broker information to these hashmaps. Getting the topic routing message is getting the broker/messageQueue and so on from the topicQueueTable.

NameServer’s “registration,” “discovery,” “heartbeat,” etc., are all operations on the hashMap member variables RouteInfoManager.

Well, that’s the end of this article, and we’ll start next with broker analysis.


Limited to the author’s personal level, there are inevitable mistakes in the article, welcome to correct! Original is not easy, commercial reprint please contact the author to obtain authorization, non-commercial reprint please indicate the source.

In this paper, starting from WeChat number public road, Java technology link: the original mp.weixin.qq.com/s/VPFfsqp_H…

If you liked this article, we want more source code analysis (currently has completed the spring/springboot mybatis/tomcat source code analysis), welcome to the attention of the public, let us together in a world of technology road!