First, throw a map

Description:

This article covers many aspects of RabbitMQ, such as:

  • Message sending acknowledgement mechanism

  • Consumption confirmation mechanism

  • Redelivery of messages

  • Idempotency of consumption, and so on

These are all based on the overall flow chart above, so it is necessary to post them first, see the picture for the meaning

Second, implementation ideas

  • This paper briefly introduces how to obtain the authorization code of 163 mailbox

  • Write the sending mail utility class

  • Write the RabbitMQ configuration file

  • Producer calls

  • Consumers send mail

  • Scheduled task Pulls the delivery failure message periodically and redelivers the message

  • Test verification of various abnormal conditions

Extension: Consumer-side idempotent Validation and Message acknowledgement (ACK) using dynamic proxies

Iii. Project introduction

  • Springboot version 2.1.5.RELEASE. Some configuration properties may not be available in older versions and need to be configured in code

  • The RabbitMQ version 3.7.15

  • MailUtil: tool class for sending mails

  • RabbitConfig: Specifies rabbitMQ configurations

  • TestServiceImpl: indicates the producer that sends messages

  • -Leonard: Well, I’m a MailConsumer

  • ResendMsg: scheduled task to resend failed messages

Above is the core code, MsgLogService mapper XML are not posted, complete code can refer to GitHub source, address at the end of the article.

Four, code implementation

1.163 Obtaining email authorization code, as shown in the figure:

This authorization code is the password required for the configuration file spring.mail.password

2.pom

  1. <! --mq-->

  2. <dependency>

  3. <groupId>org.springframework.boot</groupId>

  4. <artifactId>spring-boot-starter-amqp</artifactId>

  5. </dependency>

  6. <! --mail-->

  7. <dependency>

  8. <groupId>org.springframework.boot</groupId>

  9. <artifactId>spring-boot-starter-mail</artifactId>

  10. </dependency>

3. Configure RabbitMQ and email

  1. # rabbitmq

  2. spring.rabbitmq.host=localhost

  3. spring.rabbitmq.port=5672

  4. spring.rabbitmq.username=guest

  5. spring.rabbitmq.password=guest

  6. # Open Confirms callback P -> Exchange

  7. spring.rabbitmq.publisher-confirms=true

  8. ReturnedMessage callback Exchange -> Queue

  9. spring.rabbitmq.publisher-returns=true

  10. Queue -> C

  11. spring.rabbitmq.listener.simple.acknowledge-mode=manual

  12. spring.rabbitmq.listener.simple.prefetch=100

  13. # mail

  14. spring.mail.host=smtp.163.com

  15. [email protected]

  16. spring.mail.password=123456wangzai

  17. [email protected]

  18. spring.mail.properties.mail.smtp.auth=true

  19. spring.mail.properties.mail.smtp.starttls.enable=true

  20. spring.mail.properties.mail.smtp.starttls.required=true

Note: Password indicates the authorization code, and username and FROM must be the same

4. The table structure

  1. CREATE TABLE msg_log (

  2. Msg_id varchar(255) NOT NULL DEFAULT COMMENT ”,

  3. ‘ ‘MSG text COMMENT ‘,

  4. ” exchange varchar(255) NOT NULL DEFAULT COMMENT ‘,

  5. Routing_key varchar(255) NOT NULL DEFAULT COMMENT ”,

  6. ‘ ‘status int(11) NOT NULL DEFAULT ‘0’ COMMENT’ status: 0

  7. ‘ ‘try_count int(11) NOT NULL DEFAULT ‘0’ COMMENT’ retry ‘,

  8. ‘ ‘next_try_time datetime DEFAULT NULL COMMENT’ ‘,

  9. ‘ ‘create_time datetime DEFAULT NULL COMMENT’ create_time ‘,

  10. ‘ ‘update_time datetime DEFAULT NULL COMMENT’ update_time ‘,

  11. PRIMARY KEY (msg_id),

  12. UNIQUE KEY unq_msg_id (msg_id) USING BTREE

  13. ) ENGINE=InnoDB DEFAULT CHARSET= utf8MB4 COMMENT=' ';

Note: The Exchange ROUTing_key field is needed when a scheduled task reposts a message

5.MailUtil

  1. @Component

  2. @Slf4j

  3. public class MailUtil {

  4. @Value("${spring.mail.from}")

  5. private String from;

  6. @Autowired

  7. private JavaMailSender mailSender;

  8. / * *

  9. * Send a simple email

  10. *

  11. * @param mail

  12. * /

  13. public boolean send(Mail mail) {

  14. String to = mail.getTo(); // Target mailbox

  15. String title = mail.getTitle(); // The subject of the email

  16. String content = mail.getContent(); // The body of the message

  17. SimpleMailMessage message = new SimpleMailMessage();

  18. message.setFrom(from);

  19. message.setTo(to);

  20. message.setSubject(title);

  21. message.setText(content);

  22. try {

  23. mailSender.send(message);

  24. Log.info (" Email sent successfully ");

  25. return true;

  26. } catch (MailException e) {

  27. The error (" mail delivery failure, to: {}, title: {} ", to the title, e);

  28. return false;

  29. }

  30. }

  31. }

6.RabbitConfig

  1. @Configuration

  2. @Slf4j

  3. public class RabbitConfig {

  4. @Autowired

  5. private CachingConnectionFactory connectionFactory;

  6. @Autowired

  7. private MsgLogService msgLogService;

  8. @Bean

  9. public RabbitTemplate rabbitTemplate() {

  10. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

  11. rabbitTemplate.setMessageConverter(converter());

  12. // Whether the message was successfully sent to Exchange

  13. rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

  14. if (ack) {

  15. Log.info (" Message successfully sent to Exchange");

  16. String msgId = correlationData.getId();

  17. msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_SUCCESS);

  18. } else {

  19. Log.info (" Failed to send message to Exchange, {}, cause: {}", correlationData, cause);

  20. }

  21. });

  22. // Triggering the setReturnCallback callback must be set to mandatory=true, otherwise Exchange does not find the Queue and the message is discarded without triggering the callback

  23. rabbitTemplate.setMandatory(true);

  24. // Whether the message is routed from Exchange to Queue. Note that this is a failure callback and will only be called if the message fails to route from Exchange to Queue

  25. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

  26. Log.info (" Message failed to route from Exchange to Queue: Exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);

  27. });

  28. return rabbitTemplate;

  29. }

  30. @Bean

  31. public Jackson2JsonMessageConverter converter() {

  32. return new Jackson2JsonMessageConverter();

  33. }

  34. // Send an email

  35. public static final String MAIL_QUEUE_NAME = "mail.queue";

  36. public static final String MAIL_EXCHANGE_NAME = "mail.exchange";

  37. public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";

  38. @Bean

  39. public Queue mailQueue() {

  40. return new Queue(MAIL_QUEUE_NAME, true);

  41. }

  42. @Bean

  43. public DirectExchange mailExchange() {

  44. return new DirectExchange(MAIL_EXCHANGE_NAME, true, false);

  45. }

  46. @Bean

  47. public Binding mailBinding() {

  48. return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);

  49. }

  50. }

TestServiceImpl Production message

  1. @Service

  2. public class TestServiceImpl implements TestService {

  3. @Autowired

  4. private MsgLogMapper msgLogMapper;

  5. @Autowired

  6. private RabbitTemplate rabbitTemplate;

  7. @Override

  8. public ServerResponse send(Mail mail) {

  9. String msgId = RandomUtil.UUID32();

  10. mail.setMsgId(msgId);

  11. MsgLog msgLog = new MsgLog(msgId, mail, RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME);

  12. msgLogMapper.insert(msgLog); // The message is stored

  13. CorrelationData correlationData = new CorrelationData(msgId);

  14. rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME, MessageHelper.objToMsg(mail), correlationData); // Send a message

  15. return ServerResponse.success(ResponseCode.MAIL_SEND_SUCCESS.getMsg());

  16. }

  17. }

8. A MailConsumer sends a message

  1. @Component

  2. @Slf4j

  3. public class MailConsumer {

  4. @Autowired

  5. private MsgLogService msgLogService;

  6. @Autowired

  7. private MailUtil mailUtil;

  8. @RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME)

  9. public void consume(Message message, Channel channel) throws IOException {

  10. Mail mail = MessageHelper.msgToObj(message, Mail.class);

  11. Log.info (" Received message: {}", mail.tostring ());

  12. String msgId = mail.getMsgId();

  13. MsgLog msgLog = msgLogService.selectByMsgId(msgId);

  14. If (null = = msgLog | | msgLog. GetStatus () equals (Constant. MsgLogStatus. CONSUMED_SUCCESS)) {/ / consumer idempotence

  15. Log.info (" repeat consumption, msgId: {}", msgId);

  16. return;

  17. }

  18. MessageProperties properties = message.getMessageProperties();

  19. long tag = properties.getDeliveryTag();

  20. boolean success = mailUtil.send(mail);

  21. if (success) {

  22. msgLogService.updateStatus(msgId, Constant.MsgLogStatus.CONSUMED_SUCCESS);

  23. channel.basicAck(tag, false); // Confirmation of consumption

  24. } else {

  25. channel.basicNack(tag, false, true);

  26. }

  27. }

  28. }

Explanation: 3 things are accomplished: 1. Ensure idempotency of consumption; 2. Send mail; 3. Update message status, manual ACK

9.ResendMsg The message that the scheduled task fails to resend

  1. @Component

  2. @Slf4j

  3. public class ResendMsg {

  4. @Autowired

  5. private MsgLogService msgLogService;

  6. @Autowired

  7. private RabbitTemplate rabbitTemplate;

  8. // Maximum number of deliveries

  9. private static final int MAX_TRY_COUNT = 3;

  10. / * *

  11. * Every 30 seconds pull delivery failure message, re-delivery

  12. * /

  13. @Scheduled(cron = "0/30 * * * * ?" )

  14. public void resend() {

  15. Log.info (" Start scheduled task (repost message)");

  16. List<MsgLog> msgLogs = msgLogService.selectTimeoutMsg();

  17. msgLogs.forEach(msgLog -> {

  18. String msgId = msgLog.getMsgId();

  19. if (msgLog.getTryCount() >= MAX_TRY_COUNT) {

  20. msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL);

  21. Log.info (" message delivery failed, msgId: {}", msgId);

  22. } else {

  23. msgLogService.updateTryCount(msgId, msgLog.getNextTryTime()); // Number of deliveries +1

  24. CorrelationData correlationData = new CorrelationData(msgId);

  25. rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData); // repost

  26. Log.info (" no "+ (msglog.gettryCount () + 1) +" repost message ");

  27. }

  28. });

  29. Log.info (" Scheduled task execution ended (repost message)");

  30. }

  31. }

Note: Each message is bound to the Exchange routingKey, and all messages are re-cast using the same timed task

5. Basic tests

OK, so far, the code is ready, and now it’s time to test the normal process

1. Send a request:

2. Background log:

3. Database message record:

A status of 3 indicates consumption, and a message retry count of 0 indicates a successful delivery

4. Check your email

Send a success

Six, all kinds of abnormal conditions test

Step 1 lists a number of important and core RabbitMQ facts, and this article covers implementation of these facts, followed by exception testing (the verification is based on the flow chart thrown at the beginning of this article, so it is important to paste again).

1. Verify the callback in case the message fails to be sent to Exchange, as shown in P -> X above

How to verify? You can specify any switch name that doesn’t exist, request the interface, and see if a callback is triggered

Failed to send, cause: Reply-code =404, reply-text=NOT_FOUND – no exchange ‘mail.exchangeabcd’ in vhost ‘/’, this callback can ensure that the message is correctly sent to exchange, the test is complete

2. Verify the callback in case the message fails to route from Exchange to Queue, as shown in figure X -> Q

If the routing key does not exist, the route fails and the callback is triggered

Route: mail.routing.keyabcd, replyCode: 312, replyText: NO_ROUTE

3. Verification In manual ACK mode, the consumer must perform manual ACK. Otherwise, the message will be stored in the queue until it is consumed, as shown in Q -> C in the figure above

BasicAck (tag, false); // Comment out the consumption confirmation and view the console and rabbitMQ console

As you can see, although the message was consumed, it is still stored by RabbitMQ because it was manually acknowledged and not eventually acknowledged, so a manual ack can guarantee that the message will be consumed, but remember basicAck

4. Verify idempotency on the consumption side

After the previous step, remove the comment and restart the server. Since there is an unack message, the message is monitored and consumed after the restart. However, before consumption, it will judge whether the status of the message is not consumed and find that status=3, that is, consumed. In this way, the idempotency of the consumption side is ensured. Even if the callback is not triggered due to network reasons and the delivery is successful, repeated consumption will not occur and service exceptions will occur

5. Verify that messages are not lost when exceptions occur on the consumer

Obviously, an exception may occur in the code of the consumer side. If no processing is done, the business is not executed correctly and the message is missing, which gives us the feeling that the message is lost. Because our code of the consumer side has done the exception capture, when the business is abnormal, it will trigger: channel.basicNack(tag, false, true); This will tell RabbitMQ that the message has failed to be consumed and that it needs to be re-queued and re-sent to another normal consumer to ensure that the message is not lost

Test: send returns false

As you can see, channel.basicNack(tag, false, true) means that unack messages are requeued and consumed, ensuring that they don’t get lost

6. Verify message retransmission of scheduled tasks

In a real application scenario, MQ may be down due to network reasons or because the message has not been persisted, so that the ConfirmCallback method for delivery confirmation is not executed, resulting in the database that the message remains in the delivery state, and the message needs to be recast even though the message may have been consumed

The timed task only guarantees 100% delivery success, while the consumption idempotency of multiple delivery needs to be guaranteed by the consumer itself

We can comment out the code that updates the message status after the callback and consumption is successful, enable the scheduled task, and check whether to retry

It can be seen that the message will be redelivered for three times and abandoned for more than three times, and the state of the message will be set as delivery failure. In this abnormal situation, manual intervention is needed to investigate the cause

Vii. Extension: Use dynamic proxy to implement idempotent authentication and ack on the consumer side

I don’t know if you noticed, but in MailConsumer, the real business logic is to send mailUtil.send(mail), but we have to check the idempotency of the consumption before we call send, and then update the status of the message to “consumed”. There are probably many producer-consumer applications that require RabbitMQ, such as logging, sending SMS messages, etc. It would be unnecessary and difficult to maintain if we wrote these common code repetitions every time, so we could pull out the common code. Letting the core business logic only care about its implementation and do nothing else is essentially AOP

There are many ways to do this. You can use Spring AOP, you can use interceptors, you can use static proxies, you can use dynamic proxies, and in this case, I’m using dynamic proxies

The directory structure is as follows:

The core code is the implementation of the agent, so I’m not going to post all the code here, just to give you an idea of how to make the code as simple and elegant as possible

Eight, summary

Email actually very simple, but underneath it actually has a lot to pay attention to the point and improve, a seemingly small knowledge, can also lead to a lot of problems, even involves many aspects, all of these need to hit the pit themselves, of course, I need the code there must be many imperfect and optimization of the points, hope a lot of friend advice and Suggestions.

My code has been tested and verified by myself, and the graph is drawn or carefully cut bit by bit. I hope my friends can learn something. Please like or pay attention to me if you pass by.