Idempotent take and leave

The guarantee of idempotent related semantics on distributed platform is our eternal pursuit to construct secure and reliable system. As the preferred implementation of asynchronous, decoupling in general, I have often wondered what kind of disconnection Rocket MQ designers experience.

As we all know, message queues are basically implemented in three ways:

  • At most once
  • At least once
  • Exactly once

Translation:

  • Consume at most once
  • Consume at least once
  • Guaranteed consumption once

It is clear that consuming at most once will result in message loss; At least one consumption puts more demands on our business systems, and ensuring that one consumption seems good can be a heavy cost to the MQ system. Rocket MQ does not hesitate to choose At least once. The idempotent guarantee is boldly handed over to developers, which not only reflects the author’s frustration with the contradiction between MQ performance and functionality, but also reflects the trust of developers.

Overview of Consumption Status

Although the above argument is objective and true, it implies some pessimism. According to the above understanding, our business system needs to rely on TA, but we should always guard against TA, because one careless mistake may make mistakes, which is really an existence that makes people love and fear.

Reading this, THE author seems to portray ta as a naughty child, but in fact, a bit heavy, because in my understanding of reading the source code, the business system is not abnormal, the physical environment of MQ is relatively healthy, in fact, it is more difficult to appear repeated consumption.

The idempotent nature of RocketMQ is often broken by abnormal logic in business systems, or by networks, or by uncertain operating environments. For the most part, he or she is definitely still a Good Boy.

According to our simple understanding of message system, the process of message consumption satisfies the following rules:

  • Consumption will not be strictly in order of delivery, but the trend of first-in, first-out will be maintained
  • Messages should accurately record current consumption status
  • There is always a role responsible for counting and persisting the consumption offset

With empiricism, let’s see what efforts the author has made for smooth consumption and schedule management.

⚠️ Note: Rocket MQ’s sequential consumption model is strictly sequential.

OffsetStore

Once consumed, the message is ineligible to stay in ProcessQueue, which will delete the message and return the current minimum offset to the message schedule. It is easy to imagine that if this consumption schedule is not persisted, then it will be repeated every time it is started, which is obviously unacceptable, but how and to what extent?

Rocket MQ supports two subscription modes:

  • Clustered consumption: The default consumption mode in which all messages need to be consumed only once by any consumer in the group, sharing the consumption offset under the subscription Topic.

  • Broadcast consumption mode: The consumption behavior of each consumer is completely independent. All messages under subscription Topic need to be consumed by all consumers under this group.

Considering the characteristics of the two consumption models, it is easy to find that they are not easy to generalize. The ideal implementation is divided into two strategies, one centralized to Broker management, and one decentralized to consumer management. The OffsetStore interface takes care of this, and the source code confirms what we suspect.Let’s start with the OffsetStore interface definition:

public interface OffsetStore {

    /** * Loads message progress from message progress store file to memory */
    void load(a) throws MQClientException;
    
    /**
     * Get offset from local storage
     * @return The fetched offset
     */
    long readOffset(MessageQueue mq, ReadOffsetType type);

    /** * Remove offset */
    void removeOffset(MessageQueue mq);

    Map<MessageQueue, Long> cloneOffsetTable(String topic);
    
    Update the offset,store it in memory */
    void updateOffset(MessageQueue mq, long offset, boolean increaseOnly);
    
    / / Persist all offsets,may be in local storage or remote name server */
    void persistAll(Set<MessageQueue> mqs);

    /** * Persist the offset,may be in local storage or remote name server */
    void persist(MessageQueue mq);

    /** * Updates the progress of message consumption stored on the Broker side, using cluster mode */
    void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway)
        throws RemotingException, MQBrokerException,
               InterruptedException, MQClientException;

}
Copy the code

Compared with the source code, the method arrangement was I switched the order, I need to focus on the back.

⚠️ Note: If the Rocket MQ source code is not read through the ProcessQueue, you can think of TA as the carrier of a message on the Consumer side, a fragment of a physical queue. The authors define ta: Queue consumption Snapshot as follows

LocalFileOffsetStore

In broadcast mode, the message progress is kept in the Consumer terminal, and the files are placed in the configurable fixed directory according to the convention. The file path is as follows:

public class LocalFileOffsetStore implements OffsetStore {

    /** * The storage folder path can be customized */
    public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
        "rocketmq.client.localOffsetStoreDir",
        System.getProperty("user.home") + File.separator + ".rocketmq_offsets"
    );
    
    /** * the constructor concatenates the full path to the file */
    public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {
        this.mQClientFactory = mQClientFactory;
        this.groupName = groupName;
        this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
            this.mQClientFactory.getClientId() + File.separator +
            this.groupName + File.separator +
            "offsets.json"; }}Copy the code

By default, a “.rocketmq_offsets” folder is created one layer below the user path. Note the detail here: the folder starts with a “.” on Linux, it is a hidden file and requires the -a parameter to be displayed. For ease of understanding, the following figure shows a folder path and the path of an Offset persistence file.

In broadcast mode Consumer#start() calls offsetStore.load () to load the consumption progress. This works by concatenating the full path of the file according to the convention, reading the corresponding file, and serializing it into the OffsetSerializeWrapper object:

public class OffsetSerializeWrapper extends RemotingSerializable {

    /* Details the current consumption progress of each queue */
    private ConcurrentMap<MessageQueue, AtomicLong> offsetTable
        = new ConcurrentHashMap<>();
    
}
Copy the code

Suppose we have a smS-sending service subscribed to “SMS_prod”Topic, the resulting Json looks like this: Note that the offsetTable property is also Json, and that the key is a MessageQueue object, and the valule is a number representing the offset.

{
    "offsetTable": {{"topic": "SMS_prod"."brokerName": "broker0"
            "queueId": 0} :100,
        
        {
            "topic": "SMS_prod"."brokerName": "broker0"
            "queueId": 1} :100,}}Copy the code

Since you can load key information in the specified file, there is a mechanism responsible for writing it. PersistAll remember the persistAll method mentioned above?

public void persistAll(Set<MessageQueue> mqs) {
    /* Construct the OffsetSerializeWrapper object */
    OffsetSerializeWrapper offsetSerializeWrapper = 
        new OffsetSerializeWrapper();
    for (Map.Entry<MessageQueue, AtomicLong> entry : offsetTable.entrySet()) {
        if(mqs.contains(entry.getKey())) { AtomicLong offset = entry.getValue(); offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset); }}/* Serialize the offsetSerializeWrapper object */
    String jsonString = offsetSerializeWrapper.toJson(true);
    /* Write the serialized offsetSerializeWrapper to the file */
    MixAll.string2File(jsonString, this.storePath);
}
Copy the code

The operations on offsets. Json are encapsulated in the MixAll utility class:

  • Mixall. file2String: Reads out the file
  • Mixall. string2File: Writes the serialized object to a file

RemoteBrokerOffsetStore

Because offsets are maintained on the Broker side, the load method of this implementation is just a declaration. The constructor doesn’t need to calculate the file path, and the offsetTable property is the same. Let’s focus on saving message consumption progress in the clustered consumption mode.

public void persistAll(Set<MessageQueue> mqs) {
    HashSet<MessageQueue> unusedMQ = new HashSet<>();

    for (Map.Entry<MessageQueue, AtomicLong> entry : offsetTable.entrySet()) {
        MessageQueue mq = entry.getKey();
        AtomicLong offset = entry.getValue();
        if(offset ! =null) {
            if (mqs.contains(mq)) {
                this.updateConsumeOffsetToBroker(mq, offset.get());
            } else{ unusedMQ.add(mq); }}}if(! unusedMQ.isEmpty()) {for (MessageQueue mq : unusedMQ) {
            this.offsetTable.remove(mq); }}}Copy the code

Without digging deeper, we should be able to spot at least two differences:

  • Different granularity: Broadcast mode persists the entire offsetTable at once, while cluster mode is refined to the entry level.
  • Calls are made differently: in broadcast mode, direct JVM internal calls are written to files, while in clustered mode, RPC calls are required.

It is important to note that there is a difference between the offset. Json file generated by the two, which I will analyze in the following sections, as well as taking you through the RPC process.

RPC call stack: RemoteBrokerOffsetStore#persistAll() -> RemoteBrokerOffsetStore#updateConsumeOffsetToBroker() Assemble the RPC request header UpdateConsumerOffsetRequestHeader object - > MQClientAPIImpl# updateConsumerOffsetOneway () assembled RemotingCommand RPC request object -> NettyRemotingClient#invokeSync() invokes the RPC call to update the offset. The RPC call type is requestcode.update_consumer_offset. ConsumerManageProcessor.updateConsumerOffset() -> ConsumerOffsetManager.commitOffset()Copy the code

Tracking the source code shows that every time a Consumer makes an RPC call to report its consumption progress, the Broker does not persist it immediately after receiving it, but updates it directly into memory.

private void commitOffset(String clientHost, String key, int queueId, long offset) { String key = topic + TOPIC_GROUP_SEPARATOR + group; ConcurrentMap<Integer, Long> map = offsetTable.get(key); if (Objects.isNull(map)) { map = new ConcurrentHashMap<>(32); map.put(queueId, offset); this.offsetTable.put(key, map); } else { Long storeOffset = map.put(queueId, offset); }}Copy the code

TOPIC_GROUP_SEPARATOR is the defined constant: “@”, we mentioned earlier that there is a slight difference between json and offsetTable, where the key becomes a concatenated string with TopicName on the left and ConsumeGroupName on the right and an @ sign in between. To make it easier to understand, LET me show you this JSON as well:

/** * Note the key:%RETRY%ConsumeGroup */
{
    "offsetTable": {
        "Topic@ConsumeGroup":{
            0: 38,
            1: 37,
            2: 50,
            3: 10
         },
         "%RETRY%ConsumeGroup": {0:0}}}Copy the code

persistence

There are no major differences between the two file persistence mechanisms. The scheduled task can be triggered, or it can be triggered manually before shotdown() occurs when the consumer is properly shut down.

The broadcast mode scheduled task is defined in MQClientInstance, which is started when the MQClientInstance object calls start() after it is instantiated. The default interval for a scheduled task is 5000ms. The task starts to be executed after 10,000 ms.

public void start(a) throws MQClientException {
    this.scheduledExecutorService.scheduleAtFixedRate(
        () -> {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e); }},1000 * 10.this.clientConfig.getPersistConsumerOffsetInterval(),
        TimeUnit.MILLISECONDS
    );
}
Copy the code

In BrokerController, the BrokerController object has a sequence of initialization actions after it is instantiated, and Initialize () starts the scheduled task. The default interval for a scheduled task is 5000ms. The task starts to be executed after 10,000 ms.

public boolean initialize(a) throws CloneNotSupportedException {
    this.scheduledExecutorService.scheduleAtFixedRate(
        () -> {
            try {
                BrokerController.this.consumerOffsetManager.persist();
            } catch (Throwable e) {
                log.error("schedule persist consumerOffset error.", e); }},1000 * 10.this.brokerConfig.getFlushConsumerOffsetInterval(),
        TimeUnit.MILLISECONDS
    );
}
Copy the code

Repeat purchases

After analyzing the principle for so long, THE point I want to convey is that the reason for repeated consumption under the premise of normal use must be related to offset reporting and persistence.

  • In the process of cluster consumption, the Consumer broke down unexpectedly, and the offset was not reported, resulting in repeated consumption
  • The Broker crashed unexpectedly during cluster consumption, and offset did not persist the latest offset, resulting in repeated consumption
  • The broadcast consumption process Consumer broke down unexpectedly, and the offset was not persisted to a local file, resulting in repeated consumption
  • Offset. Json file is accidentally damaged or deleted, and progress is lost, resulting in repeated consumption
  • The offset. Json file is tampered, and the progress is inaccurate, resulting in repeated consumption

Another is repeated consumption because the developer returned an incorrect ACK flag, causing Rocket to misjudge the purchase as a failure and trigger retry logic.

If this article is useful to you, please give a thumbs up 👍 The last sentence is only available to Dilieba and Daniel Wu