This is the 19th day of my participation in the August Wenwen Challenge.More challenges in August

This chapter continues to familiarize yourself with RocketMQ operations

The order message

Validation: You can start multiple instances of Consumer and observe the message allocation for each order and the order in which each order is consumed in multiple steps.

Producer:

/ * * *@Package: RocketMQOrderExample
 * @ClassName: Producer
 * @Author: AZ
 * @CreateTime: 2021/8/10 20:07
 * @Description: * /
public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        try {
            producer.setNamesrvAddr(Constant.NAMESERVER);
            producer.start();

            for(int i=0; i<10; i++){
                int orderId = i;

                for(int j=0; j<=5; j++){
                    Message msg = new Message("OrderTopicTest"."order_"+orderId, "KEY"+orderId,
                            ("order_"+orderId+" step "+ j).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    System.out.println("The content of the message sent is -" {"+ msg.getBody().toString() + "}");
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, orderId);
                    System.out.printf("%s%n", sendResult);
                }
            }
            producer.shutdown();
        }catch(MQClientException | RemotingException | MQBrokerException | InterruptedException e){ e.printStackTrace(); }}}Copy the code

Consumers Consumer


/ * * *@Package: RocketMQOrderExample
 * @ClassName: Consumer
 * @Author: AZ
 * @CreateTime: 2021/8/10 20:08
 * @Description: * /
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr(Constant.NAMESERVER);
        Where does the consumer start spending
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.subscribe("OrderTopicTest"."*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for(MessageExt msg:msgs){
                    System.out.println("Received message content" + new String(msg.getBody()));
                }
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); System.out.printf("Consumer Started.%n"); }}Copy the code

Verification result: It is found that consumption is not in order: observe the consumption content carefully

Consumer Started. Received message content order_1 step 0 Receive the message order_1 Step 4 Receive the message order_9 Step 2 Receive the message order_5 Step 5 Receive the message order_9 Step 1 Receive the message order_9 Step 5 Receive the message order_9 Step 4 Receive the message order_9 Step 3 Receive the message order_Step 1 Receive the message order_1 Step 5 Receive the message order_1 Step 2 Receive the message order_1 step 3 *****Copy the code

Modify the consumer code as follows: Modify the class used by the consumer

consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        context.setAutoCommit(true);
        for(MessageExt msg:msgs){
            System.out.println("Received message content"+new String(msg.getBody()));
        }
        returnConsumeOrderlyStatus.SUCCESS; }});Copy the code

The final output is as follows:

Consumer Started. Received message content order_1 step 0 Receive the message order_Step 1 Receive the message order_1 Step 2 Receive the message order_1 Step 3 Receive the message order_1 Step 4 Receive the message order_1 Step 5 Receive the message order_5 Step 0 Receive the message order_5 Step 1 Receive the message order_5 Step 2 Receive the message order_5 Step 3 Receive the message order_5 Step 4 Receive the message order_5 Step 5 Receive the message order_9 Step 0 Receives the message order_9 Step 1 Receive the message order_9 Step 2 Receive the message order_9 Step 3 Receive the message order_9 Step 4 Receive the message order_9 Step 5 Receive the message order_0 step 0 The message is received order_0 step 1 The message is received order_0 step 2 Receive the message order_0 step 3 Receive the message order_0 step 4 Receive the message order_0 step 5 *****Copy the code

After verification, it can be seen that the order of multiple messages placed under each order is fixed from 0 to 5, regardless of how the order is allocated in front of multiple Consumer instances. But a closer look reveals:

RocketMQ guarantees local rather than global ordering of messages. That is, all messages from the same orderId are guaranteed to be ordered (to the same queue), but not all messages are guaranteed to be ordered.

Then I started two more consumers: Consumer2 and Consumer3

The final output is as follows:

Consumer:

Received the message order_2 step 0 Receive the message order_2 Step 1 Receive the message order_Step 2 Receive the message order_2 Step 3 Receive the message order_2 Step 4 Receive the message order_2 Step 5 Receive the message order_6 Step 0 Receive the message order_6 Step 1 Receive the message order_6 Step 2 Receive the message order_6 Step 3 Receive the message order_6 Step 4 Receive the message order_6 step 5
Copy the code

Consumer2

Received the message order_0 step 0 The message is received order_0 step 1 The message is received order_0 step 2 Receive the message order_0 step 3 Receive the message order_0 step 4 Receive the message order_0 step 5 Receive the message order_1 step 0 Receive the message order_Step 1 Receive the message order_1 Step 2 Receive the message order_1 Step 3 Receive the message order_1 Step 4 Receive the message order_1 Step 5 Receive the message order_4 Step 0 Receive the message order_4 Step 1 Receive the message order_4 Step 2 Receive the message order_4 Step 3 Receive the message order_Step 4 Receive the message order_4 Step 5 Receive the message order_5 Step 0 Receive the message order_5 Step 1 Receive the message order_5 Step 2 Receive the message order_5 Step 3 Receive the message order_5 Step 4 Receive the message order_5 Step 5 Receive the message order_8 Step 0 Receives the message order_8 Step 1 Receive the message order_8 Step 2 Receive the message order_8 Step 3 Receive the message order_8 Step 4 Receive the message order_8 Step 5 Receive the message order_9 Step 0 Receives the message order_9 Step 1 Receive the message order_9 Step 2 Receive the message order_9 Step 3 Receive the message order_9 Step 4 Receive the message order_9 step 5
Copy the code

Consumer3

Received the message order_3 Step 0 Receive the message order_3 Step 1 Receive the message order_3 Step 2 Receive the message order_3 Step 3 Receive the message order_3 Step 4 Receive the message order_3 Step 5 Receive the message order_7 Step 0 Receive the message order_7 Step 1 Receive the message order_7 Step 2 Receive the message order_7 Step 3 Receive the message content order_7 Step 4 Receive the message order_7 step 5
Copy the code

It can be seen that after modification, the consumption contents of the latter orderId are in order, and the order from 0 to 9 is still guaranteed in the case of three consumers’ consumption. However, before modification, the consumption contents of the latter orderId are still not guaranteed to be in order according to the consumption contents of the orderId even if one consumer.

So what’s the difference between the two:

This relates to the RocketMQ message ordering principle. To ensure that the messages obtained by the final Consumer are orderly, it is necessary to ensure that the messages are orderly from Producer, Broker and Consumer.

The sender

By default, message senders send messages to different MessageQueue(partitioned queues) in Round Robin mode, and when consumers consume messages from MessageQueue, there is no guarantee that they will be consumed in the order in which the messages are sent. Only when a group of messages are sent to the same MessageQueue can the order of the group consumption be guaranteed.

The consumer end

For consumers, after listening to the message sent by Producer, they will get the message from MessageQueue. However, for consumers, the “pile” of queues thrown by Producer does not know which queue to consume from first, so they want to ensure that the message is in order. You have to make sure that consumers consume messages queue by queue, that is, they consume messages in one queue and then consume messages in another queue.

The Consumer code uses the MessageListenerOrderly object, which takes the form of a lock within RocketMQ to ensure that messages are fetched per queue, per column. And. MessageListenerConcurrently doesn’t lock queue, is to obtain the Message every time a batch of data (default is less than 32), so there is no guarantee that orderly.

PS: It can also be known that sequential consumption sacrifices certain system performance, which should be known. After all, everything comes at a price. Notice that only local order is guaranteed here, and in general, we’re guaranteed local order.

Broadcast messages

As the name implies: broadcast news consumers did not specify a particular sample, it will be sent out after the news, let Consumer decision, do you want to receive the news that you are interested, you can subscribe to it, the follow-up to send the same topic news, consumers will perceive, messages are received, no matter whether customers are in the same group. This is different from the CLUSTERING of consumers, where each message is consumed by only one instance of the same consumer group (messagemodel.clustering)

Example:

Consumer

/ * * *@Package: rocketmqbroadcast
 * @ClassName: PushConsumer
 * @Author: AZ
 * @CreateTime: 2021/8/14 18:56
 * @Description: * /
public class PushConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // Set the consumer's consumption pattern
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.subscribe("OrderTopicTest"."*");

        consumer.setNamesrvAddr(Constant.NAMESERVER);

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for(int i=0; i<msgs.size(); i++) {
                    System.out.printf("Recive New Message: %s %n".new String(msgs.get(i).getBody()));
                }
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); System.out.printf("Broadcast Consumer Started. %n"); }}Copy the code

Then start the three consumers consuming sequentially, and observe that Consumer, Consumer2, and Consumer3 subscribe to the same topic in the same group. After other consumers consume, they will not consume the same message. PushConsumer sets the message consumption mode to broadcast consumption, and even if the message is in the same group, it will be consumed again.

The displayed information is as follows:

ConsumeMessageThread_1 Recive New Message: order_0 step 0 
ConsumeMessageThread_2 Recive New Message: order_0 step 1 
ConsumeMessageThread_3 Recive New Message: order_0 step 2 
ConsumeMessageThread_4 Recive New Message: order_0 step 3 
ConsumeMessageThread_5 Recive New Message: order_0 step 4 
ConsumeMessageThread_6 Recive New Message: order_0 step 5 
ConsumeMessageThread_7 Recive New Message: order_1 step 0 
ConsumeMessageThread_8 Recive New Message: order_1 step 1 
ConsumeMessageThread_9 Recive New Message: order_1 step 2 
ConsumeMessageThread_10 Recive New Message: order_1 step 3 
ConsumeMessageThread_11 Recive New Message: order_1 step 4 
ConsumeMessageThread_12 Recive New Message: order_1 step 5 
ConsumeMessageThread_13 Recive New Message: order_2 step 0 
ConsumeMessageThread_14 Recive New Message: order_2 step 1 
ConsumeMessageThread_15 Recive New Message: order_2 step 2 
ConsumeMessageThread_16 Recive New Message: order_2 step 3 
ConsumeMessageThread_17 Recive New Message: order_2 step 4 
ConsumeMessageThread_18 Recive New Message: order_2 step 5 
ConsumeMessageThread_19 Recive New Message: order_3 step 0 
ConsumeMessageThread_20 Recive New Message: order_3 step 1 
ConsumeMessageThread_1 Recive New Message: order_3 step 2 
ConsumeMessageThread_2 Recive New Message: order_3 step 3 
ConsumeMessageThread_3 Recive New Message: order_3 step 4 
ConsumeMessageThread_4 Recive New Message: order_3 step 5 
ConsumeMessageThread_5 Recive New Message: order_4 step 0 
ConsumeMessageThread_6 Recive New Message: order_4 step 1 
ConsumeMessageThread_7 Recive New Message: order_4 step 2 
ConsumeMessageThread_8 Recive New Message: order_4 step 3 
ConsumeMessageThread_9 Recive New Message: order_4 step 4 
ConsumeMessageThread_10 Recive New Message: order_4 step 5 
ConsumeMessageThread_11 Recive New Message: order_5 step 0 
ConsumeMessageThread_12 Recive New Message: order_5 step 1 
ConsumeMessageThread_13 Recive New Message: order_5 step 2 
ConsumeMessageThread_14 Recive New Message: order_5 step 3 
ConsumeMessageThread_15 Recive New Message: order_5 step 4 
ConsumeMessageThread_16 Recive New Message: order_5 step 5 
Copy the code

The consumption information is consumed again. This kind of repetitive consumption is unnecessary for us. So broadcast consumption is generally not used.

Delay message

public class ScheduledMessageProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("ExampleProducerGroup");
        defaultMQProducer.setNamesrvAddr(Constant.NAMESERVER);

        defaultMQProducer.start();
        int totalMessageTosend = 100;

        for(int i=0; i<totalMessageTosend; i++){
            Message message = new Message("TestTopic2", ("Hello scheduled message" + i).getBytes());

            // Set the delay level
            message.setDelayTimeLevel(3); defaultMQProducer.send(message); } defaultMQProducer.shutdown(); }}Copy the code

Delayed messages are messages that are not sent immediately after the producer.send method is called. Instead, they are sent after a period of time. RocketMQ supports 18 fixed latency levels for latency times. The role of these 18 latency levels in delayed messages is described below.

Batch message

Batch message means that multiple messages are combined into one batch message and sent out at a time. In this way, the network IO is reduced and the throughput is improved

Example code:

public class SimpleBatchProducer {
    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");

        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "Tag"."OrderID001"."Hello world 0".getBytes()));
        messages.add(new Message(topic, "Tag"."OrderID002"."Hello world 1".getBytes()));
        messages.add(new Message(topic, "Tag"."OrderID003"."Hello world 2".getBytes())); producer.send(messages); producer.shutdown(); }}Copy the code

If a batch message is larger than 1MB, do not send it in one batch. Instead, send it in multiple batches. It’s official.

However, in actual use, this 1MB limit can be slightly expanded, the actual maximum limit is 4194304 bytes, about 4MB. However, the use of batch messages is limited. Batch messages must have the same Topic, the same waitStoreMsgOK. And it can’t be delayed messages, transactional messages, etc.

The following example operation

public class SplitBatchProducer {
    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
        producer.setNamesrvAddr(Constant.NAMESERVER);
        producer.start();

        // large batch
        String topic = "BatchTest";

        Integer capacity = 100 * 1000;
        List<Message> messages = new ArrayList<>(capacity);
        for(int i=0; i<capacity; i++){
            messages.add(new Message("Tag"."OrderId_"+i, ("Hello MQ"+ i).getBytes())); } producer.send(messages); }}Copy the code

The following error occurs:

Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: CODE: 13  DESC: the message body size over max value, MAX: 4194304
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
	at org.apache.rocketmq.client.Validators.checkMessage(Validators.java:101)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:552)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1289)
	at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:899)
	at rocketmqapibatchexample.SplitBatchProducer.main(SplitBatchProducer.java:32)
Copy the code

You can see that the message body content exceeds the maximum length value of 4194304

Therefore, the message body needs to be divided into smaller message bodies to form a message set that does not exceed the maximum message body for message sending.

Problem encountered: Intercepting message body sending error:

CODE: 13  DESC: the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.
Copy the code

The code is as follows:

public class SplitBatchProducer {
    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
        producer.setNamesrvAddr(Constant.NAMESERVER);
        producer.start();

        // large batch
        String topic = "BatchTest";

        Integer capacity = 100 * 1000;
        List<Message> messages = new ArrayList<>(capacity);
        for(int i=0; i<capacity; i++){
            messages.add(new Message("Tag"."OrderId_"+i, ("Hello MQ" + i).getBytes()));
        }

        //producer.send(messages); MQClientException, CODE: 13 DESC: The message body size over Max value, Max: 4194304
        //split the large batch into small ones:
        ListSplitter splitter = new ListSplitter(messages);
        while(splitter.hasNext()){ List<Message> listItem = splitter.next(); producer.send(listItem); }}}Copy the code

Interception method implementation:

This code is excerpted from “Distributed messaging middleware in Action”, does not solve this problem do not know why?

public class ListSplitter implements Iterator<List<Message>> {

    // Limit the size
    private int sizeLimit = 1000 * 1000;
    // Information container storage
    private List<Message> messages;
    // Current index subscript
    private int currIndex;

    public ListSplitter(a) {}public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext(a) {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next(a) {
        int nextIndex = currIndex;
        int totalSize = 0;

        // The for loop intercepts the message body length
        for(; nextIndex < messages.size(); nextIndex++){
            Message message = messages.get(nextIndex);
            // Split the message body length calculation
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for(Map.Entry<String, String> entry : properties.entrySet()){
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; // for log overhead
            // A message exceeds the maximum length limit
            if(tmpSize > sizeLimit){
                //it is unexpected that single message exceeds the sizeLimit
                //here just let it go, otherwise it will block the splitting process
                if(nextIndex - currIndex == 0) {//if the next sublist has no element, add this one and then break, otherwise just break
                    nextIndex++;
                }
                break;
            }

            // The length of each offset plus the length of the split
            if(tmpSize + totalSize > sizeLimit){
                break;
            }else{
                totalSize += tmpSize;
            }
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        returnsubList; }}Copy the code

Then I found this on the Internet:

4.7.1 issue, it is said that 4.8.0 does not have this issue

Try it later. The code is fine.