First, throw a map
Description:
This article covers many aspects of RabbitMQ, such as:
-
Message sending acknowledgement mechanism
-
Consumption confirmation mechanism
-
Redelivery of messages
-
Idempotency of consumption, and so on
These are all based on the overall flow chart above, so it is necessary to post them first, see the picture for the meaning
Second, implementation ideas
-
This paper briefly introduces how to obtain the authorization code of 163 mailbox
-
Write the sending mail utility class
-
Write the RabbitMQ configuration file
-
Producer calls
-
Consumers send mail
-
Scheduled task Pulls the delivery failure message periodically and redelivers the message
-
Test verification of various abnormal conditions
Extension: Consumer-side idempotent Validation and Message acknowledgement (ACK) using dynamic proxies
Iii. Project introduction
-
Springboot version 2.1.5.RELEASE. Some configuration properties may not be available in older versions and need to be configured in code
-
The RabbitMQ version 3.7.15
-
MailUtil: tool class for sending mails
-
RabbitConfig: Specifies rabbitMQ configurations
-
TestServiceImpl: indicates the producer that sends messages
-
-Leonard: Well, I’m a MailConsumer
-
ResendMsg: scheduled task to resend failed messages
Above is the core code, MsgLogService mapper XML are not posted, complete code can refer to GitHub source, address at the end of the article.
Four, code implementation
1.163 Obtaining email authorization code, as shown in the figure:
This authorization code is the password required for the configuration file spring.mail.password
2.pom
-
<! --mq-->
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-amqp</artifactId>
-
</dependency>
-
<! --mail-->
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-mail</artifactId>
-
</dependency>
3. Configure RabbitMQ and email
-
# rabbitmq
-
spring.rabbitmq.host=localhost
-
spring.rabbitmq.port=5672
-
spring.rabbitmq.username=guest
-
spring.rabbitmq.password=guest
-
# Open Confirms callback P -> Exchange
-
spring.rabbitmq.publisher-confirms=true
-
ReturnedMessage callback Exchange -> Queue
-
spring.rabbitmq.publisher-returns=true
-
Queue -> C
-
spring.rabbitmq.listener.simple.acknowledge-mode=manual
-
spring.rabbitmq.listener.simple.prefetch=100
-
# mail
-
spring.mail.host=smtp.163.com
-
spring.mail.password=123456wangzai
-
spring.mail.properties.mail.smtp.auth=true
-
spring.mail.properties.mail.smtp.starttls.enable=true
-
spring.mail.properties.mail.smtp.starttls.required=true
Note: Password indicates the authorization code, and username and FROM must be the same
4. The table structure
-
CREATE TABLE
msg_log(
-
Msg_id varchar(255) NOT NULL DEFAULT COMMENT ”,
-
‘ ‘MSG text COMMENT ‘,
-
” exchange varchar(255) NOT NULL DEFAULT COMMENT ‘,
-
Routing_key varchar(255) NOT NULL DEFAULT COMMENT ”,
-
‘ ‘status int(11) NOT NULL DEFAULT ‘0’ COMMENT’ status: 0
-
‘ ‘try_count int(11) NOT NULL DEFAULT ‘0’ COMMENT’ retry ‘,
-
‘ ‘next_try_time datetime DEFAULT NULL COMMENT’ ‘,
-
‘ ‘create_time datetime DEFAULT NULL COMMENT’ create_time ‘,
-
‘ ‘update_time datetime DEFAULT NULL COMMENT’ update_time ‘,
-
PRIMARY KEY (
msg_id),
-
UNIQUE KEY
unq_msg_id(
msg_id) USING BTREE
-
) ENGINE=InnoDB DEFAULT CHARSET= utf8MB4 COMMENT=' ';
Note: The Exchange ROUTing_key field is needed when a scheduled task reposts a message
5.MailUtil
-
@Component
-
@Slf4j
-
public class MailUtil {
-
@Value("${spring.mail.from}")
-
private String from;
-
@Autowired
-
private JavaMailSender mailSender;
-
/ * *
-
* Send a simple email
-
*
-
* @param mail
-
* /
-
public boolean send(Mail mail) {
-
String to = mail.getTo(); // Target mailbox
-
String title = mail.getTitle(); // The subject of the email
-
String content = mail.getContent(); // The body of the message
-
SimpleMailMessage message = new SimpleMailMessage();
-
message.setFrom(from);
-
message.setTo(to);
-
message.setSubject(title);
-
message.setText(content);
-
try {
-
mailSender.send(message);
-
Log.info (" Email sent successfully ");
-
return true;
-
} catch (MailException e) {
-
The error (" mail delivery failure, to: {}, title: {} ", to the title, e);
-
return false;
-
}
-
}
-
}
6.RabbitConfig
-
@Configuration
-
@Slf4j
-
public class RabbitConfig {
-
@Autowired
-
private CachingConnectionFactory connectionFactory;
-
@Autowired
-
private MsgLogService msgLogService;
-
@Bean
-
public RabbitTemplate rabbitTemplate() {
-
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
-
rabbitTemplate.setMessageConverter(converter());
-
// Whether the message was successfully sent to Exchange
-
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
-
if (ack) {
-
Log.info (" Message successfully sent to Exchange");
-
String msgId = correlationData.getId();
-
msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_SUCCESS);
-
} else {
-
Log.info (" Failed to send message to Exchange, {}, cause: {}", correlationData, cause);
-
}
-
});
-
// Triggering the setReturnCallback callback must be set to mandatory=true, otherwise Exchange does not find the Queue and the message is discarded without triggering the callback
-
rabbitTemplate.setMandatory(true);
-
// Whether the message is routed from Exchange to Queue. Note that this is a failure callback and will only be called if the message fails to route from Exchange to Queue
-
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
-
Log.info (" Message failed to route from Exchange to Queue: Exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
-
});
-
return rabbitTemplate;
-
}
-
@Bean
-
public Jackson2JsonMessageConverter converter() {
-
return new Jackson2JsonMessageConverter();
-
}
-
// Send an email
-
public static final String MAIL_QUEUE_NAME = "mail.queue";
-
public static final String MAIL_EXCHANGE_NAME = "mail.exchange";
-
public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";
-
@Bean
-
public Queue mailQueue() {
-
return new Queue(MAIL_QUEUE_NAME, true);
-
}
-
@Bean
-
public DirectExchange mailExchange() {
-
return new DirectExchange(MAIL_EXCHANGE_NAME, true, false);
-
}
-
@Bean
-
public Binding mailBinding() {
-
return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);
-
}
-
}
TestServiceImpl Production message
-
@Service
-
public class TestServiceImpl implements TestService {
-
@Autowired
-
private MsgLogMapper msgLogMapper;
-
@Autowired
-
private RabbitTemplate rabbitTemplate;
-
@Override
-
public ServerResponse send(Mail mail) {
-
String msgId = RandomUtil.UUID32();
-
mail.setMsgId(msgId);
-
MsgLog msgLog = new MsgLog(msgId, mail, RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME);
-
msgLogMapper.insert(msgLog); // The message is stored
-
CorrelationData correlationData = new CorrelationData(msgId);
-
rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME, MessageHelper.objToMsg(mail), correlationData); // Send a message
-
return ServerResponse.success(ResponseCode.MAIL_SEND_SUCCESS.getMsg());
-
}
-
}
8. A MailConsumer sends a message
-
@Component
-
@Slf4j
-
public class MailConsumer {
-
@Autowired
-
private MsgLogService msgLogService;
-
@Autowired
-
private MailUtil mailUtil;
-
@RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME)
-
public void consume(Message message, Channel channel) throws IOException {
-
Mail mail = MessageHelper.msgToObj(message, Mail.class);
-
Log.info (" Received message: {}", mail.tostring ());
-
String msgId = mail.getMsgId();
-
MsgLog msgLog = msgLogService.selectByMsgId(msgId);
-
If (null = = msgLog | | msgLog. GetStatus () equals (Constant. MsgLogStatus. CONSUMED_SUCCESS)) {/ / consumer idempotence
-
Log.info (" repeat consumption, msgId: {}", msgId);
-
return;
-
}
-
MessageProperties properties = message.getMessageProperties();
-
long tag = properties.getDeliveryTag();
-
boolean success = mailUtil.send(mail);
-
if (success) {
-
msgLogService.updateStatus(msgId, Constant.MsgLogStatus.CONSUMED_SUCCESS);
-
channel.basicAck(tag, false); // Confirmation of consumption
-
} else {
-
channel.basicNack(tag, false, true);
-
}
-
}
-
}
Explanation: 3 things are accomplished: 1. Ensure idempotency of consumption; 2. Send mail; 3. Update message status, manual ACK
9.ResendMsg The message that the scheduled task fails to resend
-
@Component
-
@Slf4j
-
public class ResendMsg {
-
@Autowired
-
private MsgLogService msgLogService;
-
@Autowired
-
private RabbitTemplate rabbitTemplate;
-
// Maximum number of deliveries
-
private static final int MAX_TRY_COUNT = 3;
-
/ * *
-
* Every 30 seconds pull delivery failure message, re-delivery
-
* /
-
@Scheduled(cron = "0/30 * * * * ?" )
-
public void resend() {
-
Log.info (" Start scheduled task (repost message)");
-
List<MsgLog> msgLogs = msgLogService.selectTimeoutMsg();
-
msgLogs.forEach(msgLog -> {
-
String msgId = msgLog.getMsgId();
-
if (msgLog.getTryCount() >= MAX_TRY_COUNT) {
-
msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL);
-
Log.info (" message delivery failed, msgId: {}", msgId);
-
} else {
-
msgLogService.updateTryCount(msgId, msgLog.getNextTryTime()); // Number of deliveries +1
-
CorrelationData correlationData = new CorrelationData(msgId);
-
rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData); // repost
-
Log.info (" no "+ (msglog.gettryCount () + 1) +" repost message ");
-
}
-
});
-
Log.info (" Scheduled task execution ended (repost message)");
-
}
-
}
Note: Each message is bound to the Exchange routingKey, and all messages are re-cast using the same timed task
5. Basic tests
OK, so far, the code is ready, and now it’s time to test the normal process
1. Send a request:
2. Background log:
3. Database message record:
A status of 3 indicates consumption, and a message retry count of 0 indicates a successful delivery
4. Check your email
Send a success
Six, all kinds of abnormal conditions test
Step 1 lists a number of important and core RabbitMQ facts, and this article covers implementation of these facts, followed by exception testing (the verification is based on the flow chart thrown at the beginning of this article, so it is important to paste again).
1. Verify the callback in case the message fails to be sent to Exchange, as shown in P -> X above
How to verify? You can specify any switch name that doesn’t exist, request the interface, and see if a callback is triggered
Failed to send, cause: Reply-code =404, reply-text=NOT_FOUND – no exchange ‘mail.exchangeabcd’ in vhost ‘/’, this callback can ensure that the message is correctly sent to exchange, the test is complete
2. Verify the callback in case the message fails to route from Exchange to Queue, as shown in figure X -> Q
If the routing key does not exist, the route fails and the callback is triggered
Route: mail.routing.keyabcd, replyCode: 312, replyText: NO_ROUTE
3. Verification In manual ACK mode, the consumer must perform manual ACK. Otherwise, the message will be stored in the queue until it is consumed, as shown in Q -> C in the figure above
BasicAck (tag, false); // Comment out the consumption confirmation and view the console and rabbitMQ console
As you can see, although the message was consumed, it is still stored by RabbitMQ because it was manually acknowledged and not eventually acknowledged, so a manual ack can guarantee that the message will be consumed, but remember basicAck
4. Verify idempotency on the consumption side
After the previous step, remove the comment and restart the server. Since there is an unack message, the message is monitored and consumed after the restart. However, before consumption, it will judge whether the status of the message is not consumed and find that status=3, that is, consumed. In this way, the idempotency of the consumption side is ensured. Even if the callback is not triggered due to network reasons and the delivery is successful, repeated consumption will not occur and service exceptions will occur
5. Verify that messages are not lost when exceptions occur on the consumer
Obviously, an exception may occur in the code of the consumer side. If no processing is done, the business is not executed correctly and the message is missing, which gives us the feeling that the message is lost. Because our code of the consumer side has done the exception capture, when the business is abnormal, it will trigger: channel.basicNack(tag, false, true); This will tell RabbitMQ that the message has failed to be consumed and that it needs to be re-queued and re-sent to another normal consumer to ensure that the message is not lost
Test: send returns false
As you can see, channel.basicNack(tag, false, true) means that unack messages are requeued and consumed, ensuring that they don’t get lost
6. Verify message retransmission of scheduled tasks
In a real application scenario, MQ may be down due to network reasons or because the message has not been persisted, so that the ConfirmCallback method for delivery confirmation is not executed, resulting in the database that the message remains in the delivery state, and the message needs to be recast even though the message may have been consumed
The timed task only guarantees 100% delivery success, while the consumption idempotency of multiple delivery needs to be guaranteed by the consumer itself
We can comment out the code that updates the message status after the callback and consumption is successful, enable the scheduled task, and check whether to retry
It can be seen that the message will be redelivered for three times and abandoned for more than three times, and the state of the message will be set as delivery failure. In this abnormal situation, manual intervention is needed to investigate the cause
Vii. Extension: Use dynamic proxy to implement idempotent authentication and ack on the consumer side
I don’t know if you noticed, but in MailConsumer, the real business logic is to send mailUtil.send(mail), but we have to check the idempotency of the consumption before we call send, and then update the status of the message to “consumed”. There are probably many producer-consumer applications that require RabbitMQ, such as logging, sending SMS messages, etc. It would be unnecessary and difficult to maintain if we wrote these common code repetitions every time, so we could pull out the common code. Letting the core business logic only care about its implementation and do nothing else is essentially AOP
There are many ways to do this. You can use Spring AOP, you can use interceptors, you can use static proxies, you can use dynamic proxies, and in this case, I’m using dynamic proxies
The directory structure is as follows:
The core code is the implementation of the agent, so I’m not going to post all the code here, just to give you an idea of how to make the code as simple and elegant as possible
Eight, summary
Email actually very simple, but underneath it actually has a lot to pay attention to the point and improve, a seemingly small knowledge, can also lead to a lot of problems, even involves many aspects, all of these need to hit the pit themselves, of course, I need the code there must be many imperfect and optimization of the points, hope a lot of friend advice and Suggestions.
My code has been tested and verified by myself, and the graph is drawn or carefully cut bit by bit. I hope my friends can learn something. Please like or pay attention to me if you pass by.