Message consumption pattern

RocketMQ has two message consumption modes, cluster mode and broadcast mode.

The consumption mode is determined by consumers. The default mode is cluster mode. The consumption mode can be changed by setting:

// Set to broadcast consumption mode
consumer.setMessageModel(MessageModel.BROADCASTING);
// Set to cluster consumption mode
consumer.setMessageModel(MessageModel.CLUSTERING);
Copy the code

Ensure that the consumption patterns of the same consumer group are the same; otherwise, problems such as message loss may occur.

Cluster pattern

In cluster mode, where consumers are clustered, RocketMQ assumes that the same message can only be consumed by any consumer in the same consumer group in the cluster.

Features:

  • Each message needs to be processed by each consumer group only once, and the broker will send the message to only one consumer in the same consumer group.
  • When a message is rerouted, it cannot be guaranteed to be routed to the same machine.
  • The consumption state is maintained by the broker.

Broadcasting mode

When using broadcast mode, the broker pushes each message to all consumers within the cluster, ensuring that the message is consumed at least once by each machine.

Features:

  • The consumption schedule is maintained by consumer.
  • Ensure that messages are consumed at least once by each machine.
  • The news of consumer failure will not recur.

Message sending mode

The synchronous

Producer sends a message and waits until the broker returns the message, ensuring that the message is delivered.

public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
    DefaultMQProducer producer = new DefaultMQProducer("group001");

    / / set namesrvAddr
    producer.setNamesrvAddr("localhost:9876");

    // Start the producer
    producer.start();

    // Send a message
    Message message = new Message("topic001"."sync message".getBytes());

    SendResult sendResult = producer.send(message);
    System.out.println("sendResult: " + sendResult);

    / / close
    producer.shutdown();
    System.out.println("Shut down the producer");
}
Copy the code

Asynchronous send

The producer sends messages without waiting and directly returns them. The results of the message delivery are returned asynchronously to registered listeners.

If you want to send messages quickly without losing them, you can use asynchronous messaging.

public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("group001");

    / / specified namesrv
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();

    Message message = new Message("topic001"."async message".getBytes());
    producer.send(message, new SendCallback() {
        public void onSuccess(SendResult sendResult) {
            System.out.println("ok");
            System.out.println("sendResult: " + sendResult);
        }

        public void onException(Throwable throwable) {
            System.out.println("error"); System.out.println(throwable.getMessage()); }});// Sleep 3 seconds
    TimeUnit.SECONDS.sleep(3);

    producer.shutdown();
    System.out.println("Shut down");
}
Copy the code

One way to send

When the producer sends messages, he does not wait, but returns directly and cannot receive the message delivery result.

public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("group001");

    / / specified namesrv
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();

    Message message = new Message("topic001"."oneway message".getBytes());
    producer.sendOneway(message);

    producer.shutdown();
    System.out.println("Shut down");
}
Copy the code

Batch sending messages

RocketMQ allows multiple messages to be sent together to reduce network overhead and increase efficiency.

  • Sending messages in batches requires that all messages must be the same topic and have the same message configuration
  • Batch sending does not support delayed messages
  • Sending messages in batches does not support retry
  • The official recommendation is that a batch message should not exceed 1MB in size
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
    DefaultMQProducer producer = new DefaultMQProducer("group001");

    / / specified namesrv
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();

    List<Message> msgs = new ArrayList<Message>();
    for (int i = 0; i < 5; i++) {
        Message message = new Message("topic001", ("collection message " + i).getBytes());
        msgs.add(message);
    }

    SendResult sendResult = producer.send(msgs);
    System.out.println("sendResult: " + sendResult);

    producer.shutdown();
    System.out.println("Shut down the producer");
}
Copy the code

The message filter

The TAG to filter

RocketMQ supports message filtering using tags. Within the same topic, subscribes to desired tags to filter unwanted messages.

  1. When the producer sends a message, it sets the tag to message:

    Message message = new Message("topic001"."TAG-A"."tag message".getBytes());
    Copy the code
  2. Subscribe to tag in Consumer:

    // Subscribe topic, specify tag
    consumer.subscribe("topic001"."TAG-A||TAG-B"); // "*" indicates all messages
    Copy the code

SQL filter

In addition to tag filtering, RocketMQ also supports SQL92 filtering, which you need to enable in the configuration file broker.

enablePropertyFilter=true
Copy the code

The specified configuration file is then loaded when the Broker is started

../bin/mqbroker -n 192.168.1.6:9876 -c broker.conf
Copy the code
  • Set the userProperties parameter in Producer

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("group001");
    
        / / set namesrvAddr
        producer.setNamesrvAddr("localhost:9876");
    
        // Start the producer
        producer.start();
    
        // Send a message
        for (int i = 0; i < 10; i++) {
            Message message = new Message("topic001"."TAG-A"."KEY-01", ("sql message " + i).getBytes());
            message.putUserProperty("age", String.valueOf(i));
            SendResult sendResult = producer.send(message);
            System.out.println("sendResult: " + sendResult);
        }
    
        / / close
        producer.shutdown();
        System.out.println("Shut down the producer");
    }
    Copy the code
  • Use SQL to filter messages in consumer

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_sql");
    
        / / specified namesrv
        consumer.setNamesrvAddr("localhost:9876");
    
        / / subscribe to the topic
      	// Create MessageSelector to filter messages based on SQL
        consumer.subscribe("topic001", MessageSelector.bySql("age >= 3 and age <= 7"));
    
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt messageExt : list) {
                    System.out.println("message: " + new String(messageExt.getBody()));
                    System.out.println("messageExt: " + messageExt);
                }
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); System.out.println("consumer by sql started ...");
    }
    Copy the code

grammar

RocketMQ defines only some basic syntax to support this functionality. You can also easily extend it.

  1. Numbers compare like>.> =.<.< =.BETWEEN.=;
  2. Character comparison, like=.<>.IN;
  3. IS NULLorIS NOT NULL;
  4. Logical operationsAND.OR.NOT;

Constant type

  1. Numbers, like 123, 3.1415;
  2. Strings, like ‘ABC’, must be in single quotes;
  3. NULL, special constant;
  4. Boolean constant,TRUEFALSE;

Delay message

RocketMQ supports the use of messageDelayLevel to set delayed delivery messages.

The default configuration is:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Copy the code

This configuration item configures the delay time for each level starting from level 1.

It is possible to modify its configuration in broker.conf, but this is not recommended, as latency levels are also used in many places in the source code.

The supported time units are:

Unit of time instructions
s seconds
m points
h when
d day

use

public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
    DefaultMQProducer producer = new DefaultMQProducer("group001");

    / / set namesrvAddr
    producer.setNamesrvAddr("localhost:9876");

    // Start the producer
    producer.start();

    // Send a delayed message
    Message message = new Message("topic001"."TAG-A"."KEY-01"."delay message".getBytes());

    // Default delay Settings
    // 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    message.setDelayTimeLevel(3);   // Delay 10 seconds
    SendResult sendResult = producer.send(message);
    System.out.println("sendResult: " + sendResult);

    / / close
    producer.shutdown();
    System.out.println("Shut down the producer");
}
Copy the code

Order consumption

In RocketMQ, a topic should have multiple MessageQueue. Each MessageQueue is a queue, which supports the FIRO model, so it is only necessary to deliver messages to the same MessageQueue and consume them by a thread to ensure sequential consumption.

  • Concrete implementation – message delivery

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
            DefaultMQProducer producer = new DefaultMQProducer("group_orderly");
            / / set namesrvAddr
            producer.setNamesrvAddr("localhost:9876");
            // Start the producer
            producer.start();
            for (int i = 0; i < 20; i++) {
                // Send a message
                Message message = new Message("topic_orderly"."TAG-A"."KEY-01", ("orderly message " + i).getBytes());
                SendResult sendResult = producer.send(
                        / / message
                        message,
                        // Message queue selector
                        new MessageQueueSelector() {
                            public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                                // All messages are sent to the first MessageQueue
                                return list.get(0); }},// Customize parameters
                        null);
                System.out.println("sendResult: " + sendResult);
            }
            / / close
            producer.shutdown();
            System.out.println("Shut down the producer");
        }
    Copy the code
  • Concrete implementation – message consumption

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_orderly");
        / / specified namesrv
        consumer.setNamesrvAddr("localhost:9876");
        / / subscribe to the topic
        consumer.subscribe("topic_orderly"."*");
        // Register the sequential consumption listener
        consumer.registerMessageListener(new MessageListenerOrderly() {
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                for (MessageExt messageExt : list) {
                    System.out.println("message: " + new String(messageExt.getBody()) + " thread: " + Thread.currentThread().getName());
                }
                returnConsumeOrderlyStatus.SUCCESS; }}); consumer.start(); System.out.println("consumer by orderly started ...");
    }
    Copy the code

Retry mechanism

Producer Sends retry

Default configuration:

public DefaultMQProducer(String namespace, String producerGroup, RPCHook rpcHook) {
    this.log = ClientLogger.getLog();
    this.createTopicKey = "TBW102";
    this.defaultTopicQueueNums = 4;
  	// timeout for sending message
    this.sendMsgTimeout = 3000;					
    this.compressMsgBodyOverHowmuch = 4096;
  	// Retry times of sending synchronization failures
    this.retryTimesWhenSendFailed = 2;
  	// Retry times of asynchronous sending failures
    this.retryTimesWhenSendAsyncFailed = 2;
  	// Whether to send requests to other brokers
    this.retryAnotherBrokerWhenNotStoreOK = false;
    this.maxMessageSize = 4194304;
    this.traceDispatcher = null;
    this.namespace = namespace;
    this.producerGroup = producerGroup;
    this.defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
Copy the code

Use:

public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
    DefaultMQProducer producer = new DefaultMQProducer("group_retry");
    / / set namesrvAddr
    producer.setNamesrvAddr("localhost:9876");

    // Send timeout. Default: 3000 ms
    // this.sendMsgTimeout = 3000;
    producer.setSendMsgTimeout(1000);

    // Retry times of sending synchronization failures. The default value is 2
    // this.retryTimesWhenSendFailed = 2;
    producer.setRetryTimesWhenSendFailed(1);

    // Retry times of asynchronous sending failures. The default value is 2
    // this.retryTimesWhenSendAsyncFailed = 2;
    producer.setRetryTimesWhenSendAsyncFailed(1);

    // Whether to send to other brokers on failure. Default is false
    // this.retryAnotherBrokerWhenNotStoreOK = false;
    producer.setRetryAnotherBrokerWhenNotStoreOK(true);

    // Start the producer
    producer.start();

    // Send a message
    Message message = new Message("topic_retry"."TAG-RETRY"."retry message".getBytes());

    SendResult sendResult = producer.send(message);
    System.out.println("timestamp: " + new Date() + " sendResult: " + sendResult);
    / / close
    producer.shutdown();
    System.out.println("Shut down the producer");
}
Copy the code

Consumer retry

Set the consumption timeout period

// Consumption timeout period, unit: minute, 15 minutes by default
// this.consumeTimeout = 15L;
consumer.setConsumeTimeout(10);
Copy the code

To retry consumption, return RECONSUME_LATER in the listener.

consumer.registerMessageListener(new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (MessageExt messageExt : list) {
            System.out.println("timestamp: " + new Date() + " messageId: " + messageExt.getMsgId() + " body: " + new String(messageExt.getBody()));
        }

        // CONSUME_SUCCESS
        // RECONSUME_LATER; Try again later
        returnConsumeConcurrentlyStatus.RECONSUME_LATER; }});Copy the code

Full consumer case:

public static void main(String[] args) throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_retry");
    / / specified namesrv
    consumer.setNamesrvAddr("localhost:9876");

    // Consumption timeout period, unit: minute, 15 minutes by default
    // this.consumeTimeout = 15L;
    consumer.setConsumeTimeout(10);

    / / subscribe to the topic
    consumer.subscribe("topic_retry"."TAG-RETRY");

    consumer.registerMessageListener(new MessageListenerConcurrently() {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            for (MessageExt messageExt : list) {
                System.out.println("timestamp: " + new Date() + " messageId: " + messageExt.getMsgId() + " body: " + new String(messageExt.getBody()));
            }

            // CONSUME_SUCCESS
            // RECONSUME_LATER; Try again later
            returnConsumeConcurrentlyStatus.RECONSUME_LATER; }}); consumer.start(); System.out.println("consumer by retry started ...");
}
Copy the code

Broker message retransmission

Only in MessageModel. CLUSTERING cluster mode, the broker for message back, MessageModel. The broker will not back – national BROADCASTING mode.

The recast interval uses the messageDelayLevel described above.

Transaction message

RocketMQ 4.3+ provides distributed transaction functionality, which enables the ultimate consistency of distributed transactions through RocketMQ transaction messages.

Transaction message logic:

Half Message: The producer sends a Half Message. Once the broker receives the Half Message, it stores the Message in the RMQ_SYS_TRANS_HALF_TOPIC Message consuming queue. After the Half Message is successfully sent, the local transaction begins.

** Check transaction status: ** The Broker starts a scheduled task to consume messages in the RMQ_SYS_TRANS_HALF_TOPIC queue, and each execution of the task confirms the transaction execution status (committed, rollback, unknown) to the message sender, and if unknown, waits for the next callback.

Timeout: The message is rolled back by default if the number of times the message is checked exceeds.

TransactionListener

  • executeLocalTransaction

    After the half-message is successfully sent, this method is triggered to execute a local transaction.

  • checkLocalTransaction

    Check the status of local transactions

Transaction message usage

public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
    TransactionMQProducer producer = new TransactionMQProducer("group_transaction");
    / / set namesrvAddr
    producer.setNamesrvAddr("localhost:9876");
    final CountDownLatch latch = new CountDownLatch(1);

    // Set the transaction listener
    producer.setTransactionListener(new TransactionListener() {
        // Perform a local transaction
        public LocalTransactionState executeLocalTransaction(Message message, Object o) {

            System.out.println("executeLocalTransaction...");
            System.out.println("msg: " + new String(message.getBody()));
            System.out.println("transactionId: " + message.getTransactionId());


            try {
                TimeUnit.SECONDS.sleep(60);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            latch.countDown();

            // COMMIT_MESSAGE
            // ROLLBACK_MESSAGE, rollback message
            // UNKNOW; I'm not sure. Come back later
            return LocalTransactionState.COMMIT_MESSAGE;
        }

        // Check whether the local transaction is complete
        public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {

            System.out.println("msg: " + new String(messageExt.getBody()));
            System.out.println("transactionId: " + messageExt.getTransactionId());

            if (latch.getCount() > 0) {
                System.out.println("checkLocalTransaction: UNKNOW");
                return LocalTransactionState.UNKNOW;
            }

            System.out.println("checkLocalTransaction: COMMIT_MESSAGE");
            returnLocalTransactionState.COMMIT_MESSAGE; }});// Start the producer
    producer.start();
    Message message = new Message("topic_transaction"."TAG-A"."KEY-01"."transaction message".getBytes());
    SendResult sendResult = producer.sendMessageInTransaction(message, null);
    System.out.println("sendResult: " + sendResult);
    / / close
    producer.shutdown();
    System.out.println("Shut down the producer");
}
Copy the code

conclusion

Links to the case source code used for all of the above:

Gitee.com/lzx946/codi…