Welcome to the public number [sharedCode] committed to mainstream middleware source code analysis, personal website: www.shared-code.com/

Order news

Message ordering means that messages can be consumed in the order in which they are sent. RocketMQ ensures strict message ordering. But this order, it’s not a global order, it’s a queue order.

Sequential message producer

public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.setNamesrvAddr("localhost:9876");
            producer.start();

            String[] tags = new String[] {"TagA"."TagB"."TagC"."TagD"."TagE"};
            for (int i = 0; i < 100; i++) {
                int orderId = i % 10;
                Message msg =
                    new Message("TopicTest2", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // When sending messages, we need to implement MessageQueueSelector to select the appropriate queue
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                      	// 
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch(MQClientException | RemotingException | MQBrokerException | InterruptedException e) { e.printStackTrace(); }}Copy the code

In order messages implemented above, order messages are sent to the same Queue with the same orderId

Sequential message consumer

public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");

        // Set the NameServer address
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest1"."TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                returnConsumeOrderlyStatus.SUCCESS; }}); consumer.start(); System.out.printf("Consumer Started.%n");
    }
Copy the code

Note that messagelistener registered message listener needs to use MessagelistenerContext, ConsumeOrderlyContext, which cannot be used

MessageListenerConcurrently, ConsumeConcurrentlyContext, otherwise the order of the consumer cannot be guaranteed.

Source code analysis

/ * * *@paramMSG message *@paramSelector Message queue selector *@paramArg shard value (similar to the shard key in the sub-table) */
@Override
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
  throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  msg.setTopic(withNamespace(msg.getTopic()));
  return this.defaultMQProducerImpl.send(msg, selector, arg);
}
Copy the code

Actually sent

public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
    }

    private SendResult sendSelectImpl(
            Message msg,
            MessageQueueSelector selector,
            Object arg,
            final CommunicationMode communicationMode,
            final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
				// 1. Get topic information,
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if(topicPublishInfo ! =null && topicPublishInfo.ok()) {
        		
            MessageQueue mq = null;
            try {
            		// 2. Obtain the internal queue information of the current topic
                List<MessageQueue> messageQueueList = mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(
                        topicPublishInfo.getMessageQueueList());
                // Copy a message
                Message userMessage = MessageAccessor.cloneMessage(msg);
                / / the topic information
                String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), 
                                    mQClientFactory.getClientConfig().getNamespace());
                userMessage.setTopic(userTopic);
								//3. Get the message queue
                mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
            } catch (Throwable e) {
                throw new MQClientException("select message queue throwed exception.", e);
            }

            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeout < costTime) {
                throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
            }
            if(mq ! =null) {
            		// Get the queue, execute send message, like normal message send
                return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
            } else {
                throw new MQClientException("select message queue return null.".null); }}throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
    }
Copy the code

Step description:

  1. Gets information about the current topic, internally containing message queues
  2. Get queue information within a topic
  3. Get the message queue, which is the core of the sequential message implementationselector.select(messageQueueList, userMessage, arg), returns the corresponding queue through a custom message queue selector. Internal completely custom
  4. Once the message queue is retrieved, the send message is executed, just like a normal message, where there is no retry.

Summary: The core of sequential messages is to send the messages that you want to order to the corresponding queue according to certain conditions.

Disadvantages of sequential messages:

  1. Sequential messages cannot take advantage of the Failover feature of the cluster because the broker cannot be changed and MessageQueue is retried
  2. Queue hot spots exist. When there are too many messages in a scenario, individual queues are very busy
  3. Failure to skip consumption will cause consumption to stop
  4. The parallelism of messages depends on the number of columns, but can be dynamically adjusted by increasing the number of queues

Consider: With the sequential message pattern above, consumption can be out of order when the broker goes down and the number of queues changes

For example, in the case of multi-master clusters,

Topic: TP_TEST altogether8The MASTER - a queue1 : 1.2.3.4
MASTER-2 : 5.6.7.8

Copy the code

A topic has queues on multiple masters. If one of the masters goes down, the number of queues will become four, and the order messages sent to one queue will be sent to the other queue by orderId % queueSize. Cause consumption disorder.

So if you want strictly sequential messages, don’t use rocketMq, which can cause out-of-order consumption in extreme cases.