RocketMQ producer and message store

1. Introduction

This article mainly analyzes the sending and storage of messages from source code. Rocketmq has three implementations for sending messages: reliable synchronous, reliable asynchronous, and one-way. Current MQ middleware can be divided into persistence and non-persistence in terms of storage models. This article looks at rocketMQ’s message storage mechanism.

2. RocketMQ news

First take a look at rocketmq message wrapper class org.apache.rocketmq.com mon. Message. The message

Basic properties: Topic, message Flag, message body, extended properties

Hidden attributes:

  • Tag: Message tag, used for message filtering
  • Keys: message index key
  • WaitStoreMsgOK: Whether a message is sent and then returned after the message is stored
  • DelayTimeLevel: message delay level, used for timing messages or message retries

The extended properties are stored in Message’s properties.

3. The producer starts the process

We trace from the start method of DefaultMQProducerImpl.

Step 1: Check whether productGroup meets the requirements and change producer instanceName to process ID

//DefaultMQProducerImpl::start
public void start(a) throws MQClientException {
    this.start(true);// Defaults to true
}

public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();

            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }

            / / the first step
            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if(! registerOK) {this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

            if (startFactory) {
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}".this.defaultMQProducer.getProducerGroup(),
                this.defaultMQProducer.isSendMessageWithVIPChannel());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
Copy the code

Step 2: Create an MQClientInstance instance.

Step 3: Register with MQClientInstance to add the current producer to the management of MQClientInstance for the convenience of calling network requests and heartbeat detection.

Step 4: Start MQClientInstance. If MQClientInstance is already started, do not start it this time.

4. Basic message sending process

The process of sending messages includes verifying messages, searching routes, and sending messages (including exception handling mechanisms).

Message verification, mainly for the length of the message verification, we mainly explain the route lookup and message sending.

4.1 Searching for routes

Before a message can be sent, the routing information of the topic needs to be obtained

//DefaultMQProducerImpl::tryToFindTopicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null== topicPublishInfo || ! topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true.this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            returntopicPublishInfo; }}Copy the code

If the producer has cached the topic routing information and contains message queues, the producer directly returns the routing information. If the producer has not cached or contains message queues, the producer queries the routing information of the topic to NameServer. If the message is sent for the first time, the default topic will be used to query if it is not found. If not found, an error is reported.

4.2 Selecting Messages

The message queue is selected according to the routing information. The returned message queue is sorted by broker and sequence number. Firstly, the message sending adopts retry mechanism, and the message queue is selected and the message is sent. If the message is sent successfully, the message is returned, and the message is sent again if the message fails. There are two ways to select a message.

  • SendLatencyFaultEnable =false, default
  • SendLatencyFaultEnable =true to enable the Broker fault delay mechanism
//MQFaultStrategy::selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        try {
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        returnmq; }}final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if(notBestBroker ! =null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else{ latencyFaultTolerance.remove(notBestBroker); }}catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();
    }

    return tpInfo.selectOneMessageQueue(lastBrokerName);
}
Copy the code

4.3 Message Sending

Core message API entry: DefaultMQProducerImpl: : sendKernelImpl

private SendResult sendKernelImpl(final Message msg,
                                    final MessageQueue mq,
                                    final CommunicationMode communicationMode,
                                    final SendCallback sendCallback,
                                    final TopicPublishInfo topicPublishInfo,
                                    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
/ / to omit
}
Copy the code

Parameter description:

  • Message MSG: Message to be sent
  • MessageQueue mq: the message will be sent to the MessageQueue
  • CommunicationMode CommunicationMode: message sending mode, SYNC, ASYNC, ONEWAY
  • SendCallback SendCallback: Asynchronous message callback function
  • TopicPublishInfo TopicPublishInfo: Topic routing message
  • Long timeout: indicates the timeout time for sending messages

Sending steps:

  1. Obtain the network address of the Broker according to MessageQueue
  2. Assign globally unique ids to messages
  3. If the message sending hook function is registered, the enhanced logic prior to message sending is performed
  4. Build the message send request package
  5. According to the message sending mode, the network can be transmitted in synchronous, asynchronous, or unidirectional mode
  6. If the message-sending hook function is registered, execute the after logic

4.3.1 Synchronous Sending

MQ client sends the message entry is MQClientAPIImpl: : sendMessage

Synchronous sending steps:

  1. Check whether the message is sent properly
//AbstractSendMessageProcessor::msgCheck
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
    final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
    if(! PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
        && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
            + "] sending message is forbidden");
        return response;
    }
    if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
        String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
        log.warn(errorMsg);
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(errorMsg);
        return response;
    }

    TopicConfig topicConfig =
        this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    if (null == topicConfig) {
        int topicSysFlag = 0;
        if (requestHeader.isUnitMode()) {
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                topicSysFlag = TopicSysFlag.buildSysFlag(false.true);
            } else {
                topicSysFlag = TopicSysFlag.buildSysFlag(true.false);
            }
        }

        log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
        topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
            requestHeader.getTopic(),
            requestHeader.getDefaultTopic(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
            requestHeader.getDefaultTopicQueueNums(), topicSysFlag);

        if (null == topicConfig) {
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                topicConfig =
                    this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                        requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); }}if (null == topicConfig) {
            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
            response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
                + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
            returnresponse; }}int queueIdInt = requestHeader.getQueueId();
    int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
    if (queueIdInt >= idValid) {
        String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
            queueIdInt,
            topicConfig.toString(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

        log.warn(errorInfo);
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(errorInfo);

        return response;
    }
    return response;
}
Copy the code
  1. If the message retries exceed the maximum allowed number of retries, the message is placed on the DLD delay queue
  2. Call DefaultMessageStore: : putMessage message store

4.3.2 Asynchronous Sending

Send asynchronously, without blocking and waiting for the message server to return the message sending result, you only need to provide a callback function for the message sending client to call back when receiving the response result. In asynchronous mode, the sending performance of the sender is improved compared with synchronous sending.

4.3.3 Unidirectional Sending

One-way sending, no need to wait for the result, no need to provide a callback function, the message sender does not care whether the message is successfully sent, the principle is the same as asynchronous sending, but the message sender receives the result and does nothing.

4.3.4 Batch Sending

In batch message sending, multiple messages of the same topic are packaged and sent to the message server to reduce the number of network calls and improve the network transmission rate.

When a single message is sent, the content of the message body is stored in the body. When a batch message is sent, the content of multiple messages needs to be stored in the body. RocketMQ stores the contents of multiple messages in a fixed format.

Batch delivery:

//DefaultMQProducer::send
public SendResult send( Collection
        
          msgs)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(batch(msgs));
}
Copy the code

** Sending process: ** First at the message sender, call batch method to encapsulate a batch of messages into MessageBatch object, MessageBatch holds Listmessages inside, so that batch sending and single sending process is exactly the same.

Follow the trail:

//DefaultMQProducer::send
public SendResult send( Collection
        
          msgs)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(batch(msgs));
}


//DefaultMQProducer::batch
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
    MessageBatch msgBatch;
    try {
        msgBatch = MessageBatch.generateFromList(msgs);
        for (Message message : msgBatch) {
            Validators.checkMessage(message, this);
            MessageClientIDSetter.setUniqID(message);
            message.setTopic(withNamespace(message.getTopic()));
        }
        msgBatch.setBody(msgBatch.encode());
    } catch (Exception e) {
        throw new MQClientException("Failed to initiate the MessageBatch", e);
    }
    msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
    return msgBatch;
}


//DefaultMQProducerImpl::send
public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
Copy the code

5. Message storage

Most business systems require MQ to have persistent storage capabilities, which can greatly increase the high availability of the system.

Let’s take a look at the RocketMQ data flow:

  • CommitLog: Message storage file. Messages for all message topics are stored in the CommitLog file
  • ConsumeQueue: message consumption queue. After messages arrive at CommitLog files, they are asynchronously forwarded to the message consumption queue for message consumers to consume
  • IndexFile: message IndexFile that stores the mapping between message keys and offsets
  • Transaction state service: Stores the transaction state of each message
  • Timed message service: Each delay level corresponds to a message consumption queue, which stores the message pulling progress of the delay queue

RocketMQ’s storage architecture:

Message storage implementation class: org. Apache. Rocketmq. Store. DefaultMessageStore

Introducing core attributes:

  • MessageStoreConfig MessageStoreConfig: message store configuration property
  • CommitLog CommitLog: Implementation class for the CommitLog file storage
  • ConcurrentMap<String/* topic /, ConcurrentMap<Integer/QueueId */,ConsumeQueue>> consumeQueueTable: Message queues store cached tables, grouped by message topic
  • FlushConsumeQueueService FlushConsumeQueueService: flushConsumeQueue Specifies the flush thread
  • CleanCommitLogService CleanCommitLogService: Clears the CommitLog billing service
  • CleanConsumeQueueService CleanConsumeQueueService: Clears the ConsumeQueue file service
  • IndexService IndexService: Index file implementation class
  • AllocateMappedFileService AllocateMappedFileService: MappedFile distribution services
  • ReputMessageService: CommitLog message distribution, constructs ConsumeQueue and IndexFile files according to the CommitLog file
  • HAService HAService: storage HA mechanism
  • TransientStorePool TransientStorePool: Message heap memory cache
  • MessageArrivingListener MessageArrivingListener: Message pulls a long polling mode message to the listener
  • BrokerConfig BrokerConfig: Broker configuration properties
  • StoreCheckpoint StoreCheckpoint: indicates the checkpoint of file flushing
  • LinkedList dispatcherList: CommitLog file forwarding request

5.1 Storage Process for Sending Messages

Message store entrance: org. Apache. Rocketmq. Store. DefaultMessageStore: : putMessage

  1. The message is rejected if the current Broker stops working or if the Broker is a SLAVE or if the current Rocket does not support writing, and the message is rejected if the message length exceeds 256 characters and the message property length exceeds 65536 characters
  2. Verify the message latency level
  3. Gets the CommitLog file that is currently writable
  4. PutMessageLock is applied before writing to the CommitLog file, that is, storing messages to the CommitLog file is serial
  5. Design how long messages are stored
  6. Append the message to the MappedFile
  7. Create a globally unique message ID
  8. Gets the offset of the message in the message queue
  9. The total length of the message is calculated based on the length of the message body, the length of the topic, the length of the attribute, and the message storage format
  10. If the message length +END_FILE_MIN_BLANK_LENGTH is greater than the CommitLog file
  11. Store the message memory in ByteBuffer and create AppendMessageResult.
  12. Update message queue logical offset
  13. The putMessageLock lock is released after the message appending logic is processed
  14. DefaultAppendMessageCallback: : doAppend just the message is appended to the memory, need to brush brush based on synchronous or asynchronous disk, the data in memory persisted to disk

Simplified into the following sequence diagram:

5.2 Memory Mapping Process

RocketMQ uses memory-mapped files to improve IO access performance. Individual files, whether CommitLog, ConsumeQueue, or IndexFile, are designed to be of fixed length. If a file is full and a new file is created, the file name is the global material offset corresponding to the first message.

Steps:

  1. Memory mapped files MappedFile through AllocateMappedFileService created
  2. The creation of MappedFile is a typical producer-consumer model
  3. MappedFileQueue queues the request when getLastMappedFile is called to get the MappedFile
  4. AllocateMappedFileService threads continue listening to the queue, queue request, create MappedFile object
  5. Finally, the MappedFile object is preheated, and the force method and mlock method are called at the bottom

5.3 Disk Brushing Process

After calling appendMessage of MapedFile, the message is simply loaded into ByteBuffer (in memory) without being diskeyed. Memory is flushed to disk. RocketMQ provides two ways to flush disk for commitLog.

  • Messages sent from producer to broker are stored in MappedFile and then synchronized to disk through the flush mechanism
  • Disk brushing is classified into synchronous disk brushing and asynchronous disk brushing
  • Asynchronous flush background thread executes at a certain interval
  • Synchronous brushing is also a producer-consumer model. After the broker saves the message to the MappedFile, it creates a GroupCommitRequest to place on the list and blocks the wait. The background thread retrieves the request from the list and flushes the disk, notifying the waiting thread upon a successful flush.

Synchronous brush (CommitLog.java):

// Encapsulate a flush request
public static class GroupCommitRequest {
    // This request is to brush the offSet position, such as brush 2,
    private final long nextOffset;
    // Control the flush flush
    private final CountDownLatch countDownLatch = new CountDownLatch(1);
    private volatile boolean flushOK = false;


    public GroupCommitRequest(long nextOffset) {
        this.nextOffset = nextOffset;
    }


    public long getNextOffset(a) {
        return nextOffset;
    }

    // Wake up after brushing
    public void wakeupCustomer(final boolean flushOK) {
        this.flushOK = flushOK;
        this.countDownLatch.countDown();
    }


    public boolean waitForFlush(long timeout) {
        try {
            this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
            return this.flushOK;
        } catch (InterruptedException e) {
            e.printStackTrace();
            return false; }}}/** * GroupCommit Service */
class GroupCommitService extends FlushCommitLogService {
    // The queue used to receive messages, providing write messages
    private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
    // The queue used to read messages from memory to disk
    private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();

    // Add a request for flushing
    public void putRequest(final GroupCommitRequest request) {
        synchronized (this) {
            // Add to the list where the message is written
            this.requestsWrite.add(request);
            // Wake up other threads
            if (!this.hasNotified) {
                this.hasNotified = true;
                this.notify(); }}}// Exchange read and write queues to avoid locking
    private void swapRequests(a) {
        List<GroupCommitRequest> tmp = this.requestsWrite;
        this.requestsWrite = this.requestsRead;
        this.requestsRead = tmp;
    }


    private void doCommit(a) {
        // Read queue is not empty
        if (!this.requestsRead.isEmpty()) {
            / / traverse
            for (GroupCommitRequest req : this.requestsRead) {
                // There may be a message in the next file, so a maximum of
                // two times the flush
                boolean flushOK = false;
                for (int i = 0; (i < 2) && !flushOK; i++) {
                    //
                    flushOK = (CommitLog.this.mapedFileQueue.getCommittedWhere() >= req.getNextOffset());
                    // flushOK is false and flushes again
                    if(! flushOK) { CommitLog.this.mapedFileQueue.commit(0); }}// Wake up after brushing
                req.wakeupCustomer(flushOK);
            }

            long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();
            if (storeTimestamp > 0) {
                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
            }
            // Clear the read list
            this.requestsRead.clear();
        } else {
            // Because of individual messages is set to not sync flush, it
            // will come to this process
            CommitLog.this.mapedFileQueue.commit(0); }}public void run(a) {
        CommitLog.log.info(this.getServiceName() + " service started");

        while (!this.isStoped()) {
            try {
                this.waitForRunning(0);
                this.doCommit();
            } catch (Exception e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); }}// Under normal circumstances shutdown, wait for the arrival of the
        // request, and then flush
        // Finish the unfinished brush when closing normally
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            CommitLog.log.warn("GroupCommitService Exception, ", e);
        }

        synchronized (this) {
            this.swapRequests();
        }

        this.doCommit();

        CommitLog.log.info(this.getServiceName() + " service end"); }}Copy the code

Asynchronous flush (CommitLog.java):

public void run(a) {
    CommitLog.log.info(this.getServiceName() + " service started");
    // Keep polling
    while (!this.isStoped()) {
        boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();

        int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
        // Get the number of pages to swipe
        int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();

        int flushPhysicQueueThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

        boolean printFlushProgress = false;

        // Print flush progress
        long currentTimeMillis = System.currentTimeMillis();
        // Control the brushing interval. If the current time has not reached the brushing interval, it will not be brushed
        if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
            this.lastFlushTimestamp = currentTimeMillis;
            flushPhysicQueueLeastPages = 0;
            printFlushProgress = ((printTimes++ % 10) = =0);
        }

        try {
            // Whether to flush disk spin-down
            if (flushCommitLogTimed) {
                Thread.sleep(interval);
            } else {
                this.waitForRunning(interval);
            }

            if (printFlushProgress) {
                this.printFlushProgress();
            }
            //commit Starts flushing disks
            CommitLog.this.mapedFileQueue.commit(flushPhysicQueueLeastPages);
            long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();
            if (storeTimestamp > 0) {
                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); }}catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
            this.printFlushProgress(); }}// Normal shutdown, to ensure that all the flush before exit
    boolean result = false;
    for (int i = 0; i < RetryTimesOver && ! result; i++) { result = CommitLog.this.mapedFileQueue.commit(0);
        CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
    }

    this.printFlushProgress();

    CommitLog.log.info(this.getServiceName() + " service end");
}
Copy the code

6. Summary & References

summary

Message sending flowchart:

Message storage flowchart:

The resources

  • Inside RocketMQ Technology

  • RocketMQ source Learning – Message Storage

  • Illustrates the RocketMQ message sending and storage process