Create NamesrvController

I am here are intercepted part of the code, combing the general logic, specific details of the part need to read the source code

NamesrvStartup

    public static NamesrvController main0(String[] args) {
        try {
            NamesrvController controller = createNamesrvController(args);
            start(controller);
        }
Copy the code
public static NamesrvController start(final NamesrvController controller) throws Exception {
    boolean initResult = controller.initialize();
    controller.start();
    return controller;
}
Copy the code
    public void start(a) throws Exception {
        // To actually start nameserver, this is the start that calls the remotingServer interface
        this.remotingServer.start();
        if (this.fileWatchService ! =null) {
            this.fileWatchService.start(); }}Copy the code

RemotingService

public interface RemotingService {
    // The start implementation is divided into two classes, one is the client side, one is the server side, both with the help of netty to complete
    void start(a);

    void shutdown(a);

    void registerRPCHook(RPCHook rpcHook);
}
Copy the code

Initialize NameServerController

public boolean initialize(a) {

    this.kvConfigManager.load();

    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

    this.registerProcessor();

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        // A scan is performed every 10 seconds to detect all brokers that have gone offline, with the first delay of five seconds
        @Override
        public void run(a) {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }},5.10, TimeUnit.SECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run(a) {
            NamesrvController.this.kvConfigManager.printAllPeriodically(); }},1.10, TimeUnit.MINUTES);
    return true;
}
Copy the code

RouteInfoManager

// The expiration time is two minutes
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
// Scan out all brokers that have fallen offline
public void scanNotActiveBroker(a) {
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        long last = next.getValue().getLastUpdateTimestamp();
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            RemotingUtil.closeChannel(next.getValue().getChannel());
            it.remove();
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); }}}Copy the code

Broker is registered with nameserver

NamesrvController

    private void registerProcessor(a) {
        if (namesrvConfig.isClusterTest()) {

            this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
                this.remotingExecutor);
        } else {
            // Nameservier's default requests are registered and handed over to NettyServer for processing
            this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); }}Copy the code

DefaultRequestProcessor

        switch (request.getCode()) {
            case RequestCode.PUT_KV_CONFIG:
                return this.putKVConfig(ctx, request);
            case RequestCode.GET_KV_CONFIG:
                return this.getKVConfig(ctx, request);
            case RequestCode.DELETE_KV_CONFIG:
                return this.deleteKVConfig(ctx, request);
            case RequestCode.QUERY_DATA_VERSION:
                return queryBrokerTopicConfig(ctx, request);
                // Register broker requests with nameserver
            case RequestCode.REGISTER_BROKER:
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request);
                } else {
                // Register broker to nameserver
                    return this.registerBroker(ctx, request);
                }
Copy the code
/ / call the namesrvController. GetRouteInfoManager () registerBroker method really broker in the nameserver registration
        RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
            requestHeader.getClusterName(),
            requestHeader.getBrokerAddr(),
            requestHeader.getBrokerName(),
            requestHeader.getBrokerId(),
            requestHeader.getHaServerAddr(),
            topicConfigWrapper,
            null,
            ctx.channel()
        );
Copy the code
    public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            try {
            // Only one thread can access it at a time
                this.lock.writeLock().lockInterruptibly();
ClusterName = clusterName = clusterName = clusterName = clusterName = clusterName = clusterName = clusterName = clusterName = clusterName
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                // Add brokers to a cluster
                brokerNames.add(brokerName);

                boolean registerFirst = false;
Brokerdata is retrieved according to BrokerName
BrokerAddrTable holds the detailed routing information for all brokers
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                If the broker registers for the first time, brokerDate is null, and a BrokerData is new, putting routing information into brokerAddrTable
                if (null == brokerData) {
                    registerFirst = true;
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
                //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
                //The same IP:PORT must only have one record in brokerAddrTable
                Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<Long, String> item = it.next();
                    if (null! = brokerAddr && brokerAddr.equals(item.getValue()) && brokerId ! = item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr);

                if (null! = topicConfigWrapper && MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                        || registerFirst) {
                        ConcurrentMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
                        if(tcTable ! =null) {
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                this.createAndUpdateQueueData(brokerName, entry.getValue()); }}}}// This is the core processing logic of the broker heartbeat
// By default a new BrokerLiveInfo is put to brokerLiveTable every 30 seconds, overriding the last heartbeat time
//BrokerLiveInfo This System.currentTimemillis (), the current timestamp is the broker's latest heartbeat time
                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                    new BrokerLiveInfo(
                        System.currentTimeMillis(),
                        topicConfigWrapper.getDataVersion(),
                        channel,
                        haServerAddr));
                if (null == prevBrokerLiveInfo) {
                    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                }

                if(filterServerList ! =null) {
                    if (filterServerList.isEmpty()) {
                        this.filterServerTable.remove(brokerAddr);
                    } else {
                        this.filterServerTable.put(brokerAddr, filterServerList); }}if(MixAll.MASTER_ID ! = brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if(masterAddr ! =null) {
                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                        if(brokerLiveInfo ! =null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); }}}}finally {
                this.lock.writeLock().unlock(); }}catch (Exception e) {
            log.error("registerBroker Exception", e);
        }

        return result;
    }
Copy the code