One. Repeat the message
Why does message duplication occur? There are two reasons for message duplication: 1. Message duplication during production and 2. Message duplication during consumption.
1.1 Message Duplication during Production
Because the producer sent the message to MQ, there was a network fluctuation during MQ confirmation, and the producer did not receive the acknowledgement, but MQ actually received the message. The producer then resends the message.
If the message is not acknowledged or fails to be acknowledged in the producer, we can use the scheduled task + (redis/db) to retry the message.
@Component@Slf4Jpublic class SendMessage { @Autowired private MessageService messageService; @Autowired private RabbitTemplate rabbitTemplate; Private static final int MAX_TRY_COUNT = 3; /** ** The Scheduled(cron = "0/30 ** **? ) public void resend() {log.info(" start executing scheduled task (repost message)"); List<MsgLog> msgLogs = messageService.selectTimeoutMsg(); msgLogs.forEach(msgLog -> { String msgId = msgLog.getMsgId(); if (msgLog.getTryCount() >= MAX_TRY_COUNT) { messageService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL); Log.info (" message delivery failed, msgId: {}", msgId); } else { messageService.updateTryCount(msgId, msgLog.getNextTryTime()); // CorrelationData +1 CorrelationData CorrelationData = new CorrelationData(msgId); rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData); // repost log.info(" the "+ (msglog.gettryCount () + 1) +" repost message "); }}); Log.info (" Scheduled task execution ended (repost message)"); }}Copy the code
1.2 Message repetition during consumption
After a successful consumer, there is a network fluctuation when the consumer tries to confirm to MQ. MQ does not receive the confirmation, and in order to ensure that the message is consumed, MQ continues to deliver the previous message to the consumer. The consumer then receives two identical messages.
Modify consumers to simulate exceptions
@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))public void receive(String message, @Headers Map<String,Object> headers, Throws Exception{system.out.println (" retry "+ System.currentTimemillis ()); System.out.println(message); int i = 1 / 0; }Copy the code
Configure the YML retry policy
Spring: rabbitMQ: listener: simple: retry: enabled: true # Enables the consumer to retry max-attempts: 5 # Maximum retry times initial-interval: 3000 # Retry intervalCopy the code
Because duplicate messages are due to network reasons, duplicate messages are unavoidable. But we need to keep the message idempotent.
Ii. How to ensure message idempotency
Make each message carry a globally unique ID to ensure the idempotency of the message. The specific consumption process is as follows:
- After obtaining the message, the consumer first queries whether the message exists in Redis/DB according to the ID
- If it does not exist, it is normally consumed. After the consumption, redis/db is written
- If so, the message is discarded.
producers
@PostMapping("/send")public void sendMessage(){ JSONObject jsonObject = new JSONObject(); JsonObject. Put (" message ", "Java journey"); String json = jsonObject.toJSONString(); Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8") .setMessageId(UUID.randomUUID()+"").build(); amqpTemplate.convertAndSend("javatrip",message); }Copy the code
consumers
@Component@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))public class Consumer { @RabbitHandler public void receiveMessage(Message message) throws Exception { Jedis jedis = new Jedis("localhost", 6379); String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(),"UTF-8"); System.out.println(" received message: "+ MSG +"== messageId: "+messageId); String messageIdRedis = jedis.get("messageId"); if(messageId == messageIdRedis){ return; } JSONObject jsonObject = JSONObject.parseObject(msg); String email = jsonObject.getString("message"); jedis.set("messageId",messageId); }}Copy the code
If you need to save to db, you can directly set this ID as the primary key of the message. The next time you get a duplicate message for consumption, due to the uniqueness of the database primary key, you will directly throw an exception.