We know that to ensure sequential consumption kafka must ensure that messages are kept on the same patition, and that only one consumer can consume for orderality. In this case, Kafka is reduced to a single queue, with no concurrency at all, which greatly degrades system performance. So what about RocketMQ, which is business-friendly? First, let’s take a step-by-step look at the implementation of sequential messages.

Sequential message service usage scenario

1. Order status is transferred in the scene of electric shopping mall.

Mysql > select * from binlong; mysql > select * from binlong;

3. There are sequential dependencies among other messages, and the latter one depends on the processing result of the previous one.

Wait…

Sequential messages in messaging middleware

Sequential messages (FIFO messages) are a type of message provided by MQ that is published and consumed strictly sequentially. Sequential messages consist of two parts: sequential publication and sequential consumption.

Sequential messages contain two types:

Partition order: All messages in a Partition(queue) are published and consumed in a first-in, first-out order

Global ordering: All messages within a Topic are published and consumed in a first-in, first-out order. However, global ordering greatly reduces the throughput of the system, which is not what MQ was designed for.

So the middle way is to choose the partition order.

[Local sequential consumption]

How to ensure order

In the MQ model, the order needs to be guaranteed by three phases:

  1. Messages are sent in order

  2. Messages are stored in the same order as they were sent

  3. Messages are consumed in the same order as they are stored

Sequence-on-send means that for messages requiring ordering, users should send them synchronously in the same thread. If the message A and B are sent in the same thread, A must precede B in space. Consumption retention and storage consistency require messages A and B to be processed in the order A and B arrive at Consumer.

Firstly, messages are sent sequentially. Messages sent by multiple threads cannot guarantee orderliness. Therefore, when sending messages, the business side needs to ensure that the messages with the same business number (like an order) are sent sequentially in one thread. In MQ, the message delivery method has to use synchronous delivery, and asynchronous delivery does not guarantee sequencing.

Second, sequential message storage. Mq topics have multiple queues. To ensure sequential message storage, messages with the same service number must be sent to the same queue. In MQ, MessageQueueSelector is used to select the queue to send, that is, hash the business number and then send the message to a queue by resizating the hash value based on the number of queues.

Third, sequential message consumption. To ensure sequential message consumption, the same queue can only be consumed by one consumer, so locking the consumption queue in the broker is inevitable. At the same time, a consumption queue can only be consumed by a consumer, consumer internal, there can only be a consumption thread to consume the queue. That is, a consumption queue can only be consumed by one thread in a consumer at a time.

Sequential implementation in RocketMQ

The Producer side 】

The only thing the Producer side has to do to ensure message order is to route messages to a specific partition, which in RocketMQ is achieved through MessageQueueSelector.

Public interface MessageQueueSelector {/** * select message queue ** @param MQS message queue * @param MSG message * @param arg MessageQueue select(final List<MessageQueue> MQS, final Message MSG, final Object ARg); }Copy the code
  • List MQS: All partitions under the Topic to be sent

  • Message MSG: Message object

  • Additional parameters: Users can pass their own parameters

For example, the following implementation ensures that messages for the same order are routed to the same partition:

long orderId = ((Order) object).getOrderId;
return mqs.get(orderId % mqs.size());
Copy the code

“The Consumer end”

Try locking MessageQueue.

How do we ensure that a queue is consumed by only one consumer in the first place?

Consumption queues exist on the broker side. To ensure that a queue is consumed by a consumer, the consumer must apply for a queue lock from the MQ server before consuming a message. The code for the queue lock is found in the implementation code of the RebalanceService message queue load.

Consumers to load and distribution after the consumer queue, you need to pull request mq server launch news, code implementation in RebalanceImpl# updateProcessQueueTableInRebalance, in view of the news pull order news, mq made as follows:

// Add message queues that are not in processQueueTable && mqSet. List<PullRequest> pullRequestList = new ArrayList<>(); For (MessageQueue mq: mqSet) {if (! this.processQueueTable.containsKey(mq)) { if (isOrder && ! Log. Warn ("doRebalance, {}, add a new MQ failed, {}, because lock failed", consumerGroup, mq); continue; } this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre ! = null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); }}} this.dispatchPullRequest(pullRequestList);Copy the code

A pullRequest will be created for messageQueue only after the client successfully locks the message.

Public Boolean lock(final MessageQueue MQ) {public Boolean lock(final MessageQueue MQ) { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); if (findBrokerResult ! = null) { LockBatchRequestBody requestBody = new LockBatchRequestBody(); requestBody.setConsumerGroup(this.consumerGroup); requestBody.setClientId(this.mQClientFactory.getClientId()); requestBody.getMqSet().add(mq); Try {// request the Broker to obtain the distributed lock Set<MessageQueue> lockedMq = for the specified MessageQueue this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); // The message processing queue was locked successfully. The message queue is locked successfully. There may be no local message processing queue. Setting the lock success will be done in the lockAll() method. for (MessageQueue mmqq : lockedMq) { ProcessQueue processQueue = this.processQueueTable.get(mmqq); if (processQueue ! = null) { processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); } } boolean lockOK = lockedMq.contains(mq); log.info("the message queue lock {}, {} {}", lockOK ? "OK" : "Failed", this.consumerGroup, mq); return lockOK; } catch (Exception e) { log.error("lockBatchMQ exception, " + mq, e); } } return false; }Copy the code

The lockBatchMQ method is used to send a lock request to the broker. The lockMQ method is used to send a lock request to the broker.

[Broker end implementation]

In the RebalanceLockManager#tryLockBatch method, the key attributes of the RebalanceLockManager are as follows:

/** * Message queue lock expiration time, Default 60s */ private final Static Long REBALANCE_LOCK_MAX_LIVE_TIME = long.parselong (system.getProperty ()) "rocketmq.broker.rebalance.lockMaxLiveTime", "60000")); /** * Lock */ private final Lock Lock = new ReentrantLock(); Private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable = new ConcurrentHashMap<>(1024);Copy the code

The key attributes in the LockEntry object are as follows:

/** * static class LockEntry {/** * client number */ private String clientId; */ private volatile long lastUpdateTimestamp = system.currentTimemillis (); / / Private volatile long lastUpdateTimestamp = system.currentTimemillis (); public String getClientId() { return clientId; } public void setClientId(String clientId) { this.clientId = clientId; } public long getLastUpdateTimestamp() { return lastUpdateTimestamp; } public void setLastUpdateTimestamp(long lastUpdateTimestamp) { this.lastUpdateTimestamp = lastUpdateTimestamp; } /** * Whether to lock ** @param clientId client number * @return Whether to */ public Boolean isLocked(final String clientId) {Boolean eq = this.clientId.equals(clientId); return eq && ! this.isExpired(); } /** * lock expires ** @return whether */ public Boolean isExpired() {Boolean expired = (system.currentTimemillis () - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME; return expired; }}Copy the code

ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, The LockEntry>> mqLockTable is maintained to achieve the purpose of locking messageQueue so that only one messageQueue can be consumed by one consumer at a time.

[Go back to the Consumer and get the lock]

The consumer has successfully locked the messageQueue, so the second step is to create the pullRequest to pull the message. The code of the message pull part is implemented in the PullMessageService. After the message pull is completed, Need to submit to ConsumeMessageService of consumption, consumption the implementation of the order as ConsumeMessageOrderlyService, Commit message consumption method for ConsumeMessageOrderlyService# submitConsumeRequest, specific implementation is as follows:

@Override public void submitConsumeRequest(// final List<MessageExt> msgs, // final ProcessQueue processQueue, // final MessageQueue messageQueue, // final boolean dispathToConsume) { if (dispathToConsume) { ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); }}Copy the code

The ConsumeRequest object is constructed and presented to ThreadPoolExecutor to consume in parallel. Let’s see how the Run method implements sequential ConsumeRequest consumption:

 public void run() {
            if (this.processQueue.isDropped()) {
                log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                return;
            }

            // 获得 Consumer 消息队列锁
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
                // (广播模式) 或者 (集群模式 && Broker消息队列锁有效)
                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                    final long beginTime = System.currentTimeMillis();
                    // 循环
                    for (boolean continueConsume = true; continueConsume; ) {
                        if (this.processQueue.isDropped()) {
                            log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                            break;
                        }

                        // 消息队列分布式锁未锁定,提交延迟获得锁并消费请求
                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                            && !this.processQueue.isLocked()) {
                            log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                        }
                        // 消息队列分布式锁已经过期,提交延迟获得锁并消费请求
                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                            && this.processQueue.isLockExpired()) {
                            log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                        }

                        // 当前周期消费时间超过连续时长,默认:60s,提交延迟消费请求。默认情况下,每消费1分钟休息10ms。
                        long interval = System.currentTimeMillis() - beginTime;
                        if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                            ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                            break;
                        }

                        // 获取消费消息。此处和并发消息请求不同,并发消息请求已经带了消费哪些消息。
                        final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
                        List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                        if (!msgs.isEmpty()) {
                            final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

                            ConsumeOrderlyStatus status = null;

                            // Hook:before
                            ConsumeMessageContext consumeMessageContext = null;
                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                                consumeMessageContext = new ConsumeMessageContext();
                                consumeMessageContext
                                    .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
                                consumeMessageContext.setMq(messageQueue);
                                consumeMessageContext.setMsgList(msgs);
                                consumeMessageContext.setSuccess(false);
                                // init the consume context type
                                consumeMessageContext.setProps(new HashMap<String, String>());
                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                            }

                            // 执行消费
                            long beginTimestamp = System.currentTimeMillis();
                            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                            boolean hasException = false;
                            try {
                                this.processQueue.getLockConsume().lock(); // 锁定队列消费锁

                                if (this.processQueue.isDropped()) {
                                    log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                        this.messageQueue);
                                    break;
                                }

                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            } catch (Throwable e) {
                                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
                                    RemotingHelper.exceptionSimpleDesc(e), //
                                    ConsumeMessageOrderlyService.this.consumerGroup, //
                                    msgs, //
                                    messageQueue);
                                hasException = true;
                            } finally {
                                this.processQueue.getLockConsume().unlock(); // 锁定队列消费锁
                            }

                            if (null == status //
                                || ConsumeOrderlyStatus.ROLLBACK == status//
                                || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                                log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", //
                                    ConsumeMessageOrderlyService.this.consumerGroup, //
                                    msgs, //
                                    messageQueue);
                            }

                            // 解析消费结果状态
                            long consumeRT = System.currentTimeMillis() - beginTimestamp;
                            if (null == status) {
                                if (hasException) {
                                    returnType = ConsumeReturnType.EXCEPTION;
                                } else {
                                    returnType = ConsumeReturnType.RETURNNULL;
                                }
                            } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                                returnType = ConsumeReturnType.TIME_OUT;
                            } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                                returnType = ConsumeReturnType.FAILED;
                            } else if (ConsumeOrderlyStatus.SUCCESS == status) {
                                returnType = ConsumeReturnType.SUCCESS;
                            }

                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                                consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
                            }

                            if (null == status) {
                                status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                            }

                            // Hook:after
                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                                consumeMessageContext.setStatus(status.toString());
                                consumeMessageContext
                                    .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                            }

                            ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                                .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

                            // 处理消费结果
                            continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                        } else {
                            continueConsume = false;
                        }
                    }
                } else {
                    if (this.processQueue.isDropped()) {
                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                        return;
                    }

                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
                }
            }
        }
Copy the code

After obtaining the lock object, use synchronized to attempt to apply for a thread-level exclusive lock.

If the lock is successful, only one thread consumes messages at a time.

If the lock fails, the broker will try to lock messageQueue again for 100ms. After the lock is successful, the consumer request will be submitted again

So far, the solution of the third key point is also clear, basically there are two steps.

When the message pull task is created, the message client applies to the broker to lock the MessageQueue, so that a MessageQueue can only be consumed by one consuming client at a time.

In message consumption, multiple threads first try to apply for an exclusive lock with synchronized for the consumption of the same MessageQueue, and the consumption can only be carried out after the lock is successfully added, so that a MessageQueue can only be consumed by one thread in one consuming client at a time.

[Sequential consumption problem disassembly]

  1. Broke guarantees that only one process is consuming on a queue, i.e. only one consumer consuming on a queue at a time

  2. The order of messages from the broker to the consumer should be consistent. This is transmitted through RPC, and the order of messages does not change after serialization, so it is easy to implement

  3. Messages on the consumer queue must be consumed by only one thread at a time

To solve this problem, it is common practice to lock resources when accessing them. That is, a queue of messages on the broker must be locked when accessed by consumers, and a single consumer must be locked when messages are concurrently processed by multiple threads. If a message on a BROKE queue is locked by a consumer, the lock cannot be released if the consumer crashes, so the lock on the broker needs to have an expiration date. The RocketMQ consumer does exactly that:

Sequential message considerations in RocketMQ

  1. Not all situations in a real project require sequential messages, but this is something that is often overlooked when designing a solution

  2. The sequential message is the result of the coordination between producer and consumer, but the consumer can’t guarantee the sequential message by guaranteeing the sequential consumption

  3. If the number of pull messages is set to 1 (consumeBatchSize), can sequential consumption be achieved? Here is actually not, concurrent consumption in the consumption side has multiple threads at the same time, consumeBatchSize is just a thread a pull information quantity, to order consumption has no meaning, here we are interested in can see ConsumeMessageConcurrentlyService code, Where is the logic for concurrent consumption.

The RocketMQ version of the message queue automatically retries messages at intervals of 1 second after the consumer fails to consume the message. The maximum number of retries is integer-max_value. At this point, the application will block message consumption. Therefore, it is recommended that you use sequential messages to ensure that the application can monitor and handle consumption failures in a timely manner to avoid blocking.

Again, it’s important to note that when using sequential messages, be aware of exceptions! Prevent resources from not being released!

summary

From the above understanding, we know the necessary conditions for implementing sequential messages: sequential delivery, sequential storage, and sequential consumption. RocketMQ is designed with this in mind, and we simply use the API and no additional code to constrain the business, making it easier to implement sequential messages.