Boot method

org.apache.rocketmq.broker.BrokerStartup#main

public static void main(String[] args) {
    start(createBrokerController(args));
}
Copy the code

The core component BrokerController is created

org.apache.rocketmq.broker.BrokerStartup#createBrokerController

// ToDo: k1-> Create the core component BrokerController
public static BrokerController createBrokerController(String[] args) {...// ToDo: k1-> core configuration information for the Broker
    final BrokerConfig brokerConfig = new BrokerConfig();
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    final NettyClientConfig nettyClientConfig = new NettyClientConfig();
    // TLS is security related
    nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
        String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
    // Listening port of the netty service
    nettyServerConfig.setListenPort(10911);
    // Broker stores some configuration of messages
    final MessageStoreConfig messageStoreConfig = newMessageStoreConfig(); .// ToDo: k2-> brokerId set to -1 to determine whether to manage master/slave synchronization and CommitLog conditions based on Dledger technology
    if (messageStoreConfig.isEnableDLegerCommitLog()) {
        brokerConfig.setBrokerId(-1); }...// ToDo: k1-> Create BrokerController
    final BrokerController controller = new BrokerController(
        brokerConfig,
        nettyServerConfig,
        nettyClientConfig,
        messageStoreConfig);
    // remember all configs to prevent discard
    controller.getConfiguration().registerConfig(properties);
    // BrokerController initialization
    boolean initResult = controller.initialize();

}
Copy the code

BrokerController initialization

BrokerController properties

BrokerController has many properties, pick up a few introductions.

public BrokerController(// Four core componentsfinal BrokerConfig brokerConfig,
    final NettyServerConfig nettyServerConfig,
    final NettyClientConfig nettyClientConfig,
    final MessageStoreConfig messageStoreConfig
) {
    this.brokerConfig = brokerConfig;
    this.nettyServerConfig = nettyServerConfig;
    this.nettyClientConfig = nettyClientConfig;
    this.messageStoreConfig = messageStoreConfig;
    // ToDo: k2->Broker components corresponding to various functions
    // Manage Consumer offset
    this.consumerOffsetManager = new ConsumerOffsetManager(this);
    // Manage Topic configuration
    this.topicConfigManager = new TopicConfigManager(this);
    // Process the Consumer pull message request
    this.pullMessageProcessor = new PullMessageProcessor(this);
    this.pullRequestHoldService = new PullRequestHoldService(this);
    this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService); . }Copy the code

BrokerController initialization

org.apache.rocketmq.broker.BrokerController#initialize

This focuses on the loading of Broker configuration file information, the creation of thread pools for related components, and some scheduled tasks.

// ToDo: k1-> Initialization of BrokerController
public boolean initialize(a) throws CloneNotSupportedException {
    // Load the disk configuration information, which is used in MessageStoreConfig
    boolean result = this.topicConfigManager.load();

    result = result && this.consumerOffsetManager.load();
    result = result && this.subscriptionGroupManager.load();
    result = result && this.consumerFilterManager.load();

    if (result) {
        try {
            // Message storage management component that manages messages on disk
            this.messageStore =
                new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                    this.brokerConfig);
            // If Dledger is started, dledger-related components are initialized
            if (messageStoreConfig.isEnableDLegerCommitLog()) {
                DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); }... }catch (IOException e) {
            result = false;
            log.error("Failed to initialize", e); }}// Load the disk file
    result = result && this.messageStore.load();
    The Broker is both a server (receiving requests from producers and consumers) and a client (sending messages to NameServer and consumers).
    if (result) {
...
        // The thread pool that handles the Consumer's pull request
        this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getPullMessageThreadPoolNums(),
            this.brokerConfig.getPullMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.pullThreadPoolQueue,
            new ThreadFactoryImpl("PullMessageThread_")); .// Periodically count Broker tasks
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run(a) {
                try {
                    BrokerController.this.getBrokerStats().record();
                } catch (Throwable e) {
                    log.error("schedule record error.", e);
                }
            }
        }, initialDelay, period, TimeUnit.MILLISECONDS);
        // Set a timer to persist the Consumer offset to disk
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run(a) {
                try {
                    BrokerController.this.consumerOffsetManager.persist();
                } catch (Throwable e) {
                    log.error("schedule persist consumerOffset error.", e); }}},1000 * 10.this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
        // Persist the Consumer's filter, from which the Consumer's filter is pushed down to the Broker
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run(a) {
                try {
                    BrokerController.this.consumerFilterManager.persist();
                } catch (Throwable e) {
                    log.error("schedule persist consumer filter error.", e); }}},1000 * 10.1000 * 10, TimeUnit.MILLISECONDS); .// Perform Commitlog dispatch tasks on a scheduled basis
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run(a) {
                try {
                    log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
                } catch (Throwable e) {
                    log.error("schedule dispatchBehindBytes error.", e); }}},1000 * 10.1000 * 60, TimeUnit.MILLISECONDS);
        // Set the address list of NameServer
        if (this.brokerConfig.getNamesrvAddr() ! =null) {
            this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
            log.info("Set user specified name server address: {}".this.brokerConfig.getNamesrvAddr());
        } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run(a) {
                    try {
                        // Obtain the NameServer address periodically
                        BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                    } catch (Throwable e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e); }}},1000 * 10.1000 * 60 * 2, TimeUnit.MILLISECONDS); }...// Initialize, use spI-loaded services, extensible
        initialTransaction();
        initialAcl();
        initialRpcHooks();
    }
    return result;
}
Copy the code

The initialization is complete and the system starts

Start starts

org.apache.rocketmq.broker.BrokerStartup#start

The whole method is tailored to do one thing: start the Broker. The various strange components contained in the Broker are then started in start. Part of the code is as follows:

/ / start the Broker
public void start(a) throws Exception {
    // Start the message store component
    if (this.messageStore ! =null) {
        this.messageStore.start();
    }
    // Start the Netty service so that it can receive requests
    if (this.remotingServer ! =null) {
        this.remotingServer.start();
    }

    if (this.fastRemotingServer ! =null) {
        this.fastRemotingServer.start();
    }
    // A service component related to the file
    if (this.fileWatchService ! =null) {
        this.fileWatchService.start();
    }
    // ToDo: k2-> brokerOuterAPI is understood as a Netty client that sends out requests, such as heartbeats
    if (this.brokerOuterAPI ! =null) {
        this.brokerOuterAPI.start();
    }

    // ToDo: k2-> Broker core heartbeat registration task, can be understood in depth
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run(a) {
            try {
                    BrokerController.this.registerBrokerAll(true.false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e); }}// The task interval defaults to 30 seconds
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); . }Copy the code

This is the entire process that the Broker starts. Its overall structure is as follows:

Next, take a closer look at the process of Broker heartbeat registration

The Broker is registered

org.apache.rocketmq.broker.BrokerController#registerBrokerAll

The first half of the code is the configuration for the Topic. The Broker needs to be registered.

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { ... // ToDo: K1 -> Key code, determine whether registration is required, Then call doRegisterBrokerAll is really registered Broker if (forceRegister | | needRegister (this) brokerConfig) getBrokerClusterName (), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.brokerConfig.getRegisterBrokerTimeoutMills())) { doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);  }}Copy the code

org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll

Register a real implementation with code logic

// ToDo: k2-> Register the real core code
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
    TopicConfigSerializeWrapper topicConfigWrapper) {
    // Broker requests to all nameservers, so registration returns a List
    List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
...
    // If the result is greater than 0, the first value in the list is taken
    if (registerBrokerResultList.size() > 0) {
        RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
        if(registerBrokerResult ! =null) {
            // Address of the primary node
            if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() ! =null) {
                this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
            }
            // Address of the slave node
            this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());

            if (checkOrderConfig) {
                this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); }}}}Copy the code

Encapsulates a request packet and sends a request to NameServer.

org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll

Obtain service list information, encapsulate packets, register:

// ToDo: k1: get all list information of NameServer
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();

final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
    brokerOuterExecutor.execute(new Runnable() {
...
                // Register with NameServer
                RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                // Register complete, local cache
                if(result ! =null) { registerBrokerResultList.add(result); }... }Copy the code

org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBroker

Make a request to netty:

. / / encapsulate a web request RemotingCommand request. = RemotingCommand createRequestCommand (RequestCode. REGISTER_BROKER requestHeader); // Where the network request is actually sent, The remotingClient is a NettyClinet RemotingCommand response = this. RemotingClient. InvokeSync (namesrvAddr, request, timeoutMills); // Assert response! = null;Copy the code

Registration completed