This is a summary of some of the common rocketMQ issues that will be covered later.

Duplicate message consumption problem:

The RocketMQ message re-consumption problem is either caused by a message being sent without the broker returning an acknowledgement message, causing the sender to re-send the message, or by having multiple groups consumed once by each group in clustered mode. Solution: Implement idempotent messages by logging message-specific ids to third parties such as Redis.

Ensuring the order of sending messages:

We all know that RocketMQ has four queues at the bottom of a topic by default, so to ensure that messages are sent sequentially these conditions need to be met, 1. Same topic2. Same queue3. One thread to send messages

Case study:

Message sender:

// Select queue to send messages /* * message: messages * queue selector * parameters passed to queue selector * timeout */ producer. Send (message, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> list, Message message, Object arg) { return list.get(Integer.valueOf(arg.toString())); }}, 1300, 0);Copy the code

Message consumer:

/ / MessageListenerConcurrently concurrent consumption, A queue can open multiple threads consumer. RegisterMessageListener (new MessageListenerConcurrently () {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : msgs) { byte[] body = msg.getBody(); System.out.println(new String(body)); } / / if you don't confirm whether consumer message if (" 1 ". The equals (" 2 ")) {return ConsumeConcurrentlyStatus. RECONSUME_LATER; } / / by default, this message will only be a consumer consumption point-to-point return ConsumeConcurrentlyStatus. CONSUME_SUCCESS; }}); / / MessageListenerOrderly order consumption, a queue open a thread / / set the maximum number of threads open consumer. SetConsumeThreadMax (1); Open / / set the minimum number of threads consumer. SetConsumeThreadMin (1); consumer.registerMessageListener(new MessageListenerOrderly() { public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) { for (MessageExt msg : msgs) { byte[] body = msg.getBody(); System.out.println(new String(body)); } / / by default, this message will only be a consumer consumption point-to-point return ConsumeOrderlyStatus. SUCCESS; }});Copy the code