Boot method

public static void main(String[] args) {
Copy the code

The core component BrokerController is created

// ToDo: k1-> Create the core component BrokerController
public static BrokerController createBrokerController(String[] args) {...// ToDo: k1-> core configuration information for the Broker
    final BrokerConfig brokerConfig = new BrokerConfig();
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    final NettyClientConfig nettyClientConfig = new NettyClientConfig();
    // TLS is security related
        String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
    // Listening port of the netty service
    // Broker stores some configuration of messages
    final MessageStoreConfig messageStoreConfig = newMessageStoreConfig(); .// ToDo: k2-> brokerId set to -1 to determine whether to manage master/slave synchronization and CommitLog conditions based on Dledger technology
    if (messageStoreConfig.isEnableDLegerCommitLog()) {
        brokerConfig.setBrokerId(-1); }...// ToDo: k1-> Create BrokerController
    final BrokerController controller = new BrokerController(
    // remember all configs to prevent discard
    // BrokerController initialization
    boolean initResult = controller.initialize();

Copy the code

BrokerController initialization

BrokerController properties

BrokerController has many properties, pick up a few introductions.

public BrokerController(// Four core componentsfinal BrokerConfig brokerConfig,
    final NettyServerConfig nettyServerConfig,
    final NettyClientConfig nettyClientConfig,
    final MessageStoreConfig messageStoreConfig
) {
    this.brokerConfig = brokerConfig;
    this.nettyServerConfig = nettyServerConfig;
    this.nettyClientConfig = nettyClientConfig;
    this.messageStoreConfig = messageStoreConfig;
    // ToDo: k2->Broker components corresponding to various functions
    // Manage Consumer offset
    this.consumerOffsetManager = new ConsumerOffsetManager(this);
    // Manage Topic configuration
    this.topicConfigManager = new TopicConfigManager(this);
    // Process the Consumer pull message request
    this.pullMessageProcessor = new PullMessageProcessor(this);
    this.pullRequestHoldService = new PullRequestHoldService(this);
    this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService); . }Copy the code

BrokerController initialization

This focuses on the loading of Broker configuration file information, the creation of thread pools for related components, and some scheduled tasks.

// ToDo: k1-> Initialization of BrokerController
public boolean initialize(a) throws CloneNotSupportedException {
    // Load the disk configuration information, which is used in MessageStoreConfig
    boolean result = this.topicConfigManager.load();

    result = result && this.consumerOffsetManager.load();
    result = result && this.subscriptionGroupManager.load();
    result = result && this.consumerFilterManager.load();

    if (result) {
        try {
            // Message storage management component that manages messages on disk
            this.messageStore =
                new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
            // If Dledger is started, dledger-related components are initialized
            if (messageStoreConfig.isEnableDLegerCommitLog()) {
                DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); }... }catch (IOException e) {
            result = false;
            log.error("Failed to initialize", e); }}// Load the disk file
    result = result && this.messageStore.load();
    The Broker is both a server (receiving requests from producers and consumers) and a client (sending messages to NameServer and consumers).
    if (result) {
        // The thread pool that handles the Consumer's pull request
        this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
            1000 * 60,
            new ThreadFactoryImpl("PullMessageThread_")); .// Periodically count Broker tasks
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            public void run(a) {
                try {
                } catch (Throwable e) {
                    log.error("schedule record error.", e);
        }, initialDelay, period, TimeUnit.MILLISECONDS);
        // Set a timer to persist the Consumer offset to disk
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            public void run(a) {
                try {
                } catch (Throwable e) {
                    log.error("schedule persist consumerOffset error.", e); }}},1000 * 10.this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
        // Persist the Consumer's filter, from which the Consumer's filter is pushed down to the Broker
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            public void run(a) {
                try {
                } catch (Throwable e) {
                    log.error("schedule persist consumer filter error.", e); }}},1000 * 10.1000 * 10, TimeUnit.MILLISECONDS); .// Perform Commitlog dispatch tasks on a scheduled basis
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            public void run(a) {
                try {
          "dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
                } catch (Throwable e) {
                    log.error("schedule dispatchBehindBytes error.", e); }}},1000 * 10.1000 * 60, TimeUnit.MILLISECONDS);
        // Set the address list of NameServer
        if (this.brokerConfig.getNamesrvAddr() ! =null) {
  "Set user specified name server address: {}".this.brokerConfig.getNamesrvAddr());
        } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                public void run(a) {
                    try {
                        // Obtain the NameServer address periodically
                    } catch (Throwable e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e); }}},1000 * 10.1000 * 60 * 2, TimeUnit.MILLISECONDS); }...// Initialize, use spI-loaded services, extensible
    return result;
Copy the code

The initialization is complete and the system starts

Start starts

The whole method is tailored to do one thing: start the Broker. The various strange components contained in the Broker are then started in start. Part of the code is as follows:

/ / start the Broker
public void start(a) throws Exception {
    // Start the message store component
    if (this.messageStore ! =null) {
    // Start the Netty service so that it can receive requests
    if (this.remotingServer ! =null) {

    if (this.fastRemotingServer ! =null) {
    // A service component related to the file
    if (this.fileWatchService ! =null) {
    // ToDo: k2-> brokerOuterAPI is understood as a Netty client that sends out requests, such as heartbeats
    if (this.brokerOuterAPI ! =null) {

    // ToDo: k2-> Broker core heartbeat registration task, can be understood in depth
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        public void run(a) {
            try {
                    BrokerController.this.registerBrokerAll(true.false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e); }}// The task interval defaults to 30 seconds
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); . }Copy the code

This is the entire process that the Broker starts. Its overall structure is as follows:

Next, take a closer look at the process of Broker heartbeat registration

The Broker is registered

The first half of the code is the configuration for the Topic. The Broker needs to be registered.

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { ... // ToDo: K1 -> Key code, determine whether registration is required, Then call doRegisterBrokerAll is really registered Broker if (forceRegister | | needRegister (this) brokerConfig) getBrokerClusterName (), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.brokerConfig.getRegisterBrokerTimeoutMills())) { doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);  }}Copy the code

Register a real implementation with code logic

// ToDo: k2-> Register the real core code
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
    TopicConfigSerializeWrapper topicConfigWrapper) {
    // Broker requests to all nameservers, so registration returns a List
    List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
    // If the result is greater than 0, the first value in the list is taken
    if (registerBrokerResultList.size() > 0) {
        RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
        if(registerBrokerResult ! =null) {
            // Address of the primary node
            if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() ! =null) {
            // Address of the slave node

            if (checkOrderConfig) {
                this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); }}}}Copy the code

Encapsulates a request packet and sends a request to NameServer.

Obtain service list information, encapsulate packets, register:

// ToDo: k1: get all list information of NameServer
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();

final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
    brokerOuterExecutor.execute(new Runnable() {
                // Register with NameServer
                RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                // Register complete, local cache
                if(result ! =null) { registerBrokerResultList.add(result); }... }Copy the code

Make a request to netty:

. / / encapsulate a web request RemotingCommand request. = RemotingCommand createRequestCommand (RequestCode. REGISTER_BROKER requestHeader); // Where the network request is actually sent, The remotingClient is a NettyClinet RemotingCommand response = this. RemotingClient. InvokeSync (namesrvAddr, request, timeoutMills); // Assert response! = null;Copy the code

Registration completed