Note: This series of source code analysis is based on RocketMq 4.8.0, gitee Repository link: gitee.com/funcy/rocke… .

In this paper, we analyze the process of rocketMq Producer sending messages.

Producer sends a message of the sample in the org. Apache. Rocketmq. Example. Simple. The producer class, the code is as follows:

public class Producer {
    public static void main(String[] args) 
            throws MQClientException, InterruptedException {
        String nameServer = "localhost:9876";
        // 1. Create DefaultMQProducer
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr(nameServer);
        // 2. Start producer
        producer.start();
        for (int i = 0; i < 1; i++)
            try {
                Message msg = new Message("TopicTest"."TagA"."OrderID188"."Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 3. Send the message
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch(Exception e) { e.printStackTrace(); } producer.shutdown(); }}Copy the code

The above code is divided into three steps:

  1. createDefaultMQProducerobject
  2. Start theproducer
  3. Send a message

The following analysis was carried out in accordance with these three steps.

1. DefaultMQProducerA constructor

DefaultMQProducer constructor code is as follows:

public DefaultMQProducer(final String producerGroup) {
    // Continue the call
    this(null, producerGroup, null);
}


/** * The final constructor called */
public DefaultMQProducer(final String namespace, 
        final String producerGroup, RPCHook rpcHook) {
    this.namespace = namespace;
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
Copy the code

This method simply assigns a value and creates an instance of DefaultMQProducerImpl. Let’s continue with the constructor of DefaultMQProducerImpl:

public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
    this.defaultMQProducer = defaultMQProducer;
    this.rpcHook = rpcHook;
    // Queue to send asynchronously
    this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
    // Handle the thread pool that sends asynchronously
    this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.asyncSenderThreadPoolQueue,
        new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet()); }}); }Copy the code

This constructor still handles assignment, so there’s nothing really going on here.

2. DefaultMQProducer#start: startingproducer

Then we can start using the DefaultMQProducer#start method:

public void start(a) throws MQClientException {
    this.setProducerGroup(withNamespace(this.producerGroup));
    Call the start() method of defaultMQProducerImpl
    this.defaultMQProducerImpl.start();
    // Message trajectory is related, we don't care
    if (null != traceDispatcher) {
        ...
    }
}
Copy the code

This method first calls defaultMQProducerImpl#start and then handles the operations related to the rocketMq message trajectory, which we won’t cover in this article. Let’s focus on DefaultMQProducerImpl#start(Boolean) method:

public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;
            // Check some configuration information
            this.checkConfig();
            // Change the current instanceName to the current process ID
            if (!this.defaultMQProducer.getProducerGroup()
                    .equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }
            // Get the MQ instance
            this.mQClientFactory = MQClientManager.getInstance()
                .getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
            // Register an mqClient instance
            boolean registerOK = mQClientFactory.registerProducer(
                this.defaultMQProducer.getProducerGroup(), this);
            if(! registerOK) {this.serviceState = ServiceState.CREATE_JUST;
                throw newMQClientException(...) ; }this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), 
                    new TopicPublishInfo());
            // Start the instance
            if(startFactory) { mQClientFactory.start(); } log.info(...) ;this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw newMQClientException(...) ;default:
            break;
    }

    // Send heartbeat to all brokers
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    // Periodically scans the return result of asynchronous requests
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run(a) {
            try {
                RequestFutureTable.scanExpiredRequest();
            } catch (Throwable e) {
                log.error("scan RequestFutureTable exception", e); }}},1000 * 3.1000);
}
Copy the code

This method is not complicated and the relevant content has been annotated. Three methods are highlighted here:

  1. mQClientFactory.start(): The execution method isMQClientInstance#startThis method starts some components, which we’ll examine later.
  2. mQClientFactory.sendHeartbeatToAllBrokerWithLock(): Sends heartbeat to allbroker, the final execution method isMQClientAPIImpl#sendHearbeat:
    public int sendHearbeat(
        final String addr,
        final HeartbeatData heartbeatData,
        final long timeoutMillis
    ) throws RemotingException, MQBrokerException, InterruptedException {
        // The request code is HEART_BEAT
        RemotingCommand request = RemotingCommand
            .createRequestCommand(RequestCode.HEART_BEAT, null);
        request.setLanguage(clientConfig.getLanguage());
        request.setBody(heartbeatData.encode());
        // Asynchronous invocation
        RemotingCommand response = this.remotingClient
            .invokeSync(addr, request, timeoutMillis);
        assertresponse ! =null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                return response.getVersion();
            }
            default:
                break;
        }
    
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }
    Copy the code

    Here is withbrokerThe communication,requestcodeHEART_BEATAnd as we’ll see in a later analysis,producerAlso, in conjunction with thenameServerCommunication.

  3. Return result of periodic scan asynchronous request: the final execution method isRequestFutureTable.scanExpiredRequest()We are analyzing the content of this methodproducerAnalyze when sending asynchronous messages.

2.1 MQClientInstance#start: startingMQClientInstance

Let’s start MQClientInstance with MQClientInstance#start:

public void start(a) throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // Get the address of nameServer
                if (null= =this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start the remote service, this method is only equipped with the netty client configuration
                // Note: 1. Here is the netty client, 2
                this.mQClientAPIImpl.start();
                // Start a scheduled task
                this.startScheduledTask();
                // Pull service, only for consumers
                this.pullMessageService.start();
                // Start the load balancing service for consumers only
                this.rebalanceService.start();
                // Enable internal producer
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK".this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw newMQClientException(...) ;default:
                break; }}}Copy the code

What this method does is explained clearly in the comments, so let’s take a closer look at some of the above operations.

1. mQClientAPIImpl.start(): Configures the Netty client

The NettyRemotingClient#start method is called as follows:

 @Override
public void start(a) {
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyClientConfig.getClientWorkerThreads(),
        new ThreadFactory() {
            ...
        });
    // Use Bootstrap instead of ServerBootstrap to indicate that this is a Netty client
    Bootstrap handler = this.bootstrap
        .group(this.eventLoopGroupWorker)
        .channel(NioSocketChannel.class)
        .option(...)
        // omit various options
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                // omit pipeline assembly. }});this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run(a) {... }},1000 * 3.1000);

    if (this.channelEventListener ! =null) {
        this.nettyEventExecutor.start(); }}Copy the code

For this method, the specification has two points:

  1. The method used isBootstrapRather thanServerBootstrapIs a Netty client
  2. No connection is created in the entire method

2. startScheduledTask(): Starts a scheduled task

To start a scheduled task, use MQClientInstance#startScheduledTask as follows:

private void startScheduledTask(a) {
    if (null= =this.clientConfig.getNamesrvAddr()) {
        // Obtain the address of nameServer periodically
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run(a) {
                try {
                    MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                } catch (Exception e) {
                    log.error("ScheduledTask fetchNameServerAddr exception", e); }}},1000 * 10.1000 * 60 * 2, TimeUnit.MILLISECONDS);
    }

    Update topic routing information periodically
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run(a) {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); }}},10.this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

    // Send heartbeat messages periodically
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run(a) {
            try {
                MQClientInstance.this.cleanOfflineBroker();
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {
                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); }}},1000.this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

    // Persist the message maker's consumption offset, either in a local file or pushed to the broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run(a) {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e); }}},1000 * 10.this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

    // Adjust the number of threads in the thread pool
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run(a) {
            try {
                MQClientInstance.this.adjustThreadPool();
            } catch (Exception e) {
                log.error("ScheduledTask adjustThreadPool exception", e); }}},1.1, TimeUnit.MINUTES);
}
Copy the code

There are five scheduled tasks:

  1. Time to obtainnameServerThe address,MQClientInstance#startIt’s going to call at firstMQClientAPIImpl#fetchNameServerAddrTo obtainnameServer, this method is also called here
  2. Regularly updatetopicThe routing information is going to go herenameServerObtain routing information and analyze it later
  3. Periodically send heartbeat messages tonameServerIn theDefaultMQProducerImpl#start(boolean)In, we also mentioned the direction ofnameServerTo send a heartbeat message, the same method is called in both places
  4. Persist the consumer’s consumption offset, which is only for the consumerconsumerEffective, after analysis of consumers to make analysis
  5. Adjust the number of threads in the thread pool, but at the end of the trace, this didn’t work, so I won’t say more

Here we focus on topic routing information acquisition, we passed the MQClientInstance# updateTopicRouteInfoFromNameServer () to track all the way, We came to MQClientAPIImpl# getTopicRouteInfoFromNameServer (…).

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
        boolean allowTopicNotExist) throws MQClientException, InterruptedException, 
        RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
    GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
    requestHeader.setTopic(topic);
    // Send the request with the code GET_ROUTEINFO_BY_TOPIC
    RemotingCommand request = RemotingCommand
        .createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
    RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
    assertresponse ! =null;
    switch (response.getCode()) {
        ...
        case ResponseCode.SUCCESS: {
            byte[] body = response.getBody();
            if(body ! =null) {
                returnTopicRouteData.decode(body, TopicRouteData.class); }}... }... }Copy the code

The code that sends the message to NameServer is GET_ROUTEINFO_BY_TOPIC, which was analyzed in the previous analysis of NameServer’s message processing, and also analyzed that when the message is delivered to NameServer, How does nameServer return topic data? If you forget, you can read the previous article analyzing nameServer.

Limited by space, this article will stop here first. It mainly analyzes the initiation process of producer, and the next article will analyze the message sending process.


Limited to the author’s personal level, there are inevitable mistakes in the article, welcome to correct! Original is not easy, commercial reprint please contact the author to obtain authorization, non-commercial reprint please indicate the source.

This article was first published in the wechat public number Java technology exploration, if you like this article, welcome to pay attention to the public number, let us explore together in the world of technology!