Welcome to the public number [sharedCode] committed to mainstream middleware source code analysis, personal website: www.shared-code.com/

preface

The main purpose of this article is to introduce the need to instantiate MqClientInstance when the producer or consumer starts, as the instance responsible for the communication with Borker, in which many scheduled tasks are initialized, heartbeat, nameServer address updates, and topic routing information updates

The source entry

org.apache.rocketmq.client.impl.factory.MQClientInstance

public void start(a) throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null= =this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // the interface that is responsible for network communication is called.
                    this.mQClientAPIImpl.start();
                    // Initialize scheduled tasks
                    this.startScheduledTask();
                    // Start the pullService for consumers. We will explain more about consumers
                    this.pullMessageService.start();
                    // Start the RebalanceService service for consumers
                    this.rebalanceService.start();
                    // Start a DefaultMQProducer whose groupName is CLIENT_INNER_PRODUCER,
                    // Used to send a failed consumption message back to the broker in the topic format %RETRY%ConsumerGroupName.
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK".this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.".null);
                default:
                    break; }}}Copy the code

This article focuses on the this.startscheduledTask () method

private void startScheduledTask(a) {
        if (null= =this.clientConfig.getNamesrvAddr()) {
            // Address service every two minutes (NameServer address)
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run(a) {
                    try {
                        MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                    } catch (Exception e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e); }}},1000 * 10.1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                        // Update routing information for all topics every 30 seconds with a delay of 10 milliseconds
            @Override
            public void run(a) {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); }}},10.this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
                
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                        // The scheduled task will be executed 1 second later
            @Override
            public void run(a) {
                try {
                    // Remove offline brokers every 30 seconds
                    MQClientInstance.this.cleanOfflineBroker();
                    // Send heartbeat every 30 seconds
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                } catch (Exception e) {
                    log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); }}},1000.this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                        // Persist the consumer offSet
            @Override
            public void run(a) {
                try {
                    MQClientInstance.this.persistAllConsumerOffset();
                } catch (Exception e) {
                    log.error("ScheduledTask persistAllConsumerOffset exception", e); }}},1000 * 10.this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run(a) {
                try {
                    // Resize the consumer thread pool
                    MQClientInstance.this.adjustThreadPool();
                } catch (Exception e) {
                    log.error("ScheduledTask adjustThreadPool exception", e); }}},1.1, TimeUnit.MINUTES);
    }
Copy the code

NameServer addressing is performed every two minutes

This scheduled task is initialized only when nameServer is not actively configured on the client

if (null= =this.clientConfig.getNamesrvAddr()) {
            // Address service every two minutes (NameServer address)
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run(a) {
                    try {
                        MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                    } catch (Exception e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e); }}},1000 * 10.1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }
Copy the code

Defines a timed thread pool with a 10-second delay and a 2-minute interval between each execution

public String fetchNameServerAddr(a) {
        try {
            // Remote service acquisition
            String addrs = this.topAddressing.fetchNSAddr();
            if(addrs ! =null) {
                if(! addrs.equals(this.nameSrvAddr)) {
                    log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs);
                    // Update local nameAddressList
                    this.updateNameServerAddressList(addrs);
                    this.nameSrvAddr = addrs;
                    returnnameSrvAddr; }}}catch (Exception e) {
            log.error("fetchNameServerAddr Exception", e);
        }
        return nameSrvAddr;
    }
Copy the code

In rocketMq, nameSrvAddr is automatically acquired at startup, and then updated every two minutes through a scheduled task. The current version does not support dynamic configuration for nameServer address of the interface, the default write dead this address: http://jmenv.tbsite.net:8080/rocketmq/nsaddr, we are interested in, change the source code can be used. Functional natural support

Update routing information for all topics every 30 seconds

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                        // Update routing information for all topics every 30 seconds with a delay of 10 milliseconds
            @Override
            public void run(a) {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); }}},10.this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
Copy the code

10 milliseconds delay execution, enclosing clientConfig. GetPollNameServerInterval () the default value is 1000 * 30, once every 30 seconds

    private int pollNameServerInterval = 1000 * 30;
Copy the code
updateTopicRouteInfoFromNameServer
public void updateTopicRouteInfoFromNameServer(a) {
        Set<String> topicList = new HashSet<String>();

        // Consumer, Consumer
        {
            Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, MQConsumerInner> entry = it.next();
                MQConsumerInner impl = entry.getValue();
                if(impl ! =null) {
                    // Get the topic to which the consumer subscribed
                    Set<SubscriptionData> subList = impl.subscriptions();
                    if(subList ! =null) {
                        for (SubscriptionData subData : subList) {
                            topicList.add(subData.getTopic());
                        }
                    }
                }
            }
        }

        // He is a Producer
        {
            // Get the list of producers in the current system
            Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
            / / loop
            while (it.hasNext()) {
                // Get producer information
                Entry<String, MQProducerInner> entry = it.next();
                MQProducerInner impl = entry.getValue();
                if(impl ! =null) {
                    // Get the latest topic information. Put it inside the SetSet<String> lst = impl.getPublishTopicList(); topicList.addAll(lst); }}}// Get all topic information.
        for (String topic : topicList) {
            // Update topic information in batches
            this.updateTopicRouteInfoFromNameServer(topic); }}Copy the code

When producerTable is started, it calls the registerProducer method to register its own information.

The implementation class of MQProducerInner is DefaultMQProducerImpl. Each DefaultMQProducerImpl maintains a topicPublishInfoTable, which is an internal variable of type Map. Is responsible for storing information about the topic currently concerned by DefaultMQProducerImpl. Let’s look at the implementation of getPublishTopicList

@Override
    public Set<String> getPublishTopicList(a) {
        Set<String> topicList = new HashSet<String>();
        for (String key : this.topicPublishInfoTable.keySet()) {
            topicList.add(key);
        }

        return topicList;
    }
Copy the code

The above code, without too much explanation, simply puts the producer’s internal topic information into the Set and returns it

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — – to highlight — — — — — — — — — — — — — — — — — — — — — — — — — — — — – ————————————————–

The above code is a lot, but it’s essentially two points

  1. Get all topics from consumers (subscriptions) and producers (publications), an action to collect topics
  2. After the access to the topic, to update cycle, call updateTopicRouteInfoFromNameServer method

This is the focus of this article

UpdateTopicRouteInfoFromNameServer implementation

This method belongs to the only way to update topic information, focus on a wave

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
                                                      DefaultMQProducer defaultMQProducer) {
        try {   
            // Start with a lock
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    // Topic Routing information
                    TopicRouteData topicRouteData;
                    if(isDefault && defaultMQProducer ! =null) {
                        // 1. Enter this code block, only through the default Topic, the default producucer to nameServer route information
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                                1000 * 3);
                        // topicRouteData contains the default createTopicKey topic, which is used to build nonexistent topics
                        if(topicRouteData ! =null) {
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                // Set the number of topic queues. Default is 4
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                              // Set the read queue
                                data.setReadQueueNums(queueNums);
                                / / write queuedata.setWriteQueueNums(queueNums); }}}else {
                       // 2. In addition to the default case, the other case is to actually go to nameServer to get information about the current topic
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
                    // When topic information is not empty
                    if(topicRouteData ! =null) {
                        TopicRouteTable = topicRouteTable = topicRouteTable = topicRouteTable = topicRouteTable = topicRouteTable = topicRouteTable
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        // Determine whether the new and old topic information is consistent and whether it needs to be updated
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if(! changed) {// When comparing old and new information, this method goes back to determine whether the topic exists locally, does not exist, or needs updating
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        } else {
                            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                        }

                        if (changed) {
                            // Clone TopicRouteData
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                                                        // Get the borker collection for the current topic
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                // Collect broker addresses
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }

                            // Update Pub info
                            {   // Update the published topic for producers
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);                            // Set the owning topic information
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if(impl ! =null) {
                                        // Update topic informationimpl.updateTopicPublishInfo(topic, publishInfo); }}}// Update sub info
                            {
                                // Update subscription topic for consumers
                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQConsumerInner> entry = it.next();
                                    MQConsumerInner impl = entry.getValue();
                                    if(impl ! =null) {
                                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                    }
                                }
                            }
                            log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true; }}else {
                        log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic); }}catch (Exception e) {
                    // When a topic does not exist, it is caught by an exception, but the exception is not printed
                    if(! topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && ! topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); }}finally {
                    this.lockNamesrv.unlock(); }}else {
                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS); }}catch (InterruptedException e) {
            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
        }

        return false;
    }
Copy the code
getTopicRouteInfoFromNameServer

Call remotingClient to get the remote interface and get information from nameServer.

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
        boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
        requestHeader.setTopic(topic);

        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
                // Call the original interface
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assertresponse ! =null;
        switch (response.getCode()) {
            // Topic does not exist
            case ResponseCode.TOPIC_NOT_EXIST: {
                // Whether topic is allowed to not exist && topic is not equal to TBW102
                if(allowTopicNotExist && ! topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
                }

                break;
            }
            / / the topic
            case ResponseCode.SUCCESS: {
                byte[] body = response.getBody();
                if(body ! =null) {
                    returnTopicRouteData.decode(body, TopicRouteData.class); }}default:
                break;
        }
                // Throw an exception and get caught by the outermost program
        throw new MQClientException(response.getCode(), response.getRemark());
    }
Copy the code

Conclusion:

Methods updateTopicRouteInfoFromNameServer is called a total of three places

  1. When publishing a message, there may be two calls to retrieve topic information, the source code is as follows
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
                // Get topic information locally
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        // Local does not exist
        if (null== topicPublishInfo || ! topicPublishInfo.ok()) {// Build an empty topic into local memory
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            // Call the method to update the topic information, which is explained in the whole article above
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
                / / the topic whether having the right routing information | | current topic queue information is normal
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
                // When all of the above fails, i.e. topic does not exist. Go to the default topic update method,
                Queue, filter, borkerNodes
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true.this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            returntopicPublishInfo; }}Copy the code

There are two places will call to updateTopicRouteInfoFromNameServer above,

The first place is that when you first use this topic, the local topic information does not exist, so you need to get it from nameServer normally

In the second place is not normal when acquiring the topic (usually the topic does not exist), need go updateTopicRouteInfoFromNameServer calls by default.

  1. The third local call is the scheduled task call, which updates the local topic information but does not follow the default logic for a while
/** * 1. Call * 2. The value is */ every 30 seconds
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        return updateTopicRouteInfoFromNameServer(topic, false.null);
    };
/** * If topic does not exist, the method will be called directly with isDefault = true, defaultMQProducer! =null */
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
                                                      DefaultMQProducer defaultMQProducer)
Copy the code

Conclusion: actually here, everybody may updateTopicRouteInfoFromNameServer isDefault parameters in method a little don’t understand, a diagram to explain

Note: The broker determines whether a topic supports automatic creation based on the autoCreateTopicEnable property when sending a message