Scenarios in which loss can occur during message delivery:
- Producer —— MSG ——> MQ. You can enable the message delivery result callback to ensure that each message receives a callback.
- MQ. Make queues and messages persistent and build mirrored cluster queues.
- MQ——-callback—-> producer. A callback fails. If a message has not received a callback for a period of time, the delivery fails by default and the producer needs to deliver the message to MQ again. (In this scenario, the same message will be delivered repeatedly, and the consumer needs to ensure message idempotent consumption.)
First, the idea of implementation
Technology used:
- SpringBoot
- RabbitMQ
- Mysql
- MybatisPlus
- XxlJob
Two, preparation, frame building
- Entity database:
- Message
/** * Message sending history **@author futao
* @date2020/3/31. * /
@Getter
@Setter
@Builder
@TableName("message")
public class Message extends IdTimeEntity {
@Tolerate
public Message(a) {}/** * The business data carried by the message */
@TableField("msg_data")
private String msgData;
/** * Switch name */
@TableField("exchange_name")
private String exchangeName;
/** * Routing key */
@TableField("routing_key")
private String routingKey;
/** * Message status **@see com.futao.springboot.learn.rabbitmq.doc.reliabledelivery.model.enums.MessageStatusEnum
*/
@TableField("status")
private int status;
/** * Number of retries */
@TableField("retry_times")
private int retryTimes;
/** * Next retry time */
@TableField("next_retry_date_time")
private LocalDateTime nextRetryDateTime;
}
Copy the code
- Message state enumeration
/** * Message status enumeration **@author futao
* @date2020/3/31. * /
@Getter
@AllArgsConstructor
public enum MessageStatusEnum {
/** * 1= Sending */
SENDING(1."Sending"),
/** * 2= Failed to send */
SUCCESS(2."Sent successfully"),
/** * 3= Failed to send */
FAIL(3."Send failed");
private int status;
private String description;
}
Copy the code
- The RabbitMQ configuration
spring:
rabbitmq:
host: localhost
port: 5672
username: futao
password: 123456789
virtual-host: reliable-delivery
connection-timeout: 15000
# send confirmation
publisher-confirms: true
Route failed callback
publisher-returns: true
template:
# Must be set to true to notify listeners of message routing failures rather than discarding messages
mandatory: true
app:
rabbitmq:
retry:
# Maximum number of message retries
max-retry-times: 5
Retry interval
retry-interval: 5s
# queue definition
queue:
user: user-queue
Switch definition
exchange:
user: user-exchange
Copy the code
Three, coding
- Queue switch definition and binding
/** * RabbitMQ queue definition and binding **@author futao
* @date2020/3/31. * /
@Configuration
public class Declare {
@Bean
public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName) {
return QueueBuilder
.durable(userQueueName)
//.withArgument("x-max-length", 2)
.build();
}
@Bean
public Exchange userExchange(@Value("${app.rabbitmq.exchange.user}") String userExchangeName) {
return ExchangeBuilder
.topicExchange(userExchangeName)
.durable(true)
.build();
}
@Bean
public Binding userBinding(Queue userQueue, Exchange userExchange) {
return BindingBuilder
.bind(userQueue)
.to(userExchange)
.with("user.*") .noargs(); }}Copy the code
- To enhance RabbitTemplate, set
confirmCallback()
The message delivery callback method andreturnCallback()
Message routing failed callback method
/** * Bean enhancement * [Serious warning] : Beans cannot be injected into this class. Injected beans will not be enhanced by BeanPostProcessor. * The Bean to inject must be retrieved from the container * *@author futao
* @date2020/3/20. * /
@Slf4j
@Component
public class BeanEnhance implements BeanPostProcessor {
// @Resource
// private MessageMapper messageMapper;
/** * Maximum number of retries for a message */
@Value("${app.rabbitmq.retry.max-retry-times}")
private int maxRetryTimes;
/** * Retry interval */
@Value("${app.rabbitmq.retry.retry-interval}")
private Duration retryInterval;
// @Autowired
// private RabbitTemplate rabbitTemplate;
//
// @Autowired
// private BeanEnhance enhance;
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
/ / enhance RabbitTemplate
if (RabbitTemplate.class.equals(bean.getClass())) {
// This can be used to ensure that 100% of messages are delivered to rabbitMQ. (If a message (determined by ID) has not received the callback within a certain period of time, resend the message)
// Need to set publisher-Confirm: true
((RabbitTemplate) bean).setConfirmCallback((correlationData, ack, cause) -> {
String correlationDataId = correlationData.getId();
if (ack) {
//ACK
log.debug("Message [{}] delivered successfully, set message status in DB to delivered successfully", correlationDataId);
ApplicationContextHolder.getBean(MessageMapper.class).update(null,
Wrappers.<Message>lambdaUpdate()
.set(Message::getStatus, MessageStatusEnum.SUCCESS.getStatus())
.eq(Message::getId, correlationDataId)
);
} else {
log.debug("Message [{}] delivery failed,cause:{}", correlationDataId, cause);
//NACK, the message is retransmittedApplicationContextHolder.getBean(BeanEnhance.class).reSend(correlationDataId); }});// Message route failed callback -- Publisher-returns: true and template: Mandatory: true must be set otherwise Rabbit will discard the message
((RabbitTemplate) bean).setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.warn("Message routing failed callback... Do some compensation or record...");
log.warn("message{}", message);
log.warn("replyCode{}", replyCode);
log.warn("replyText{}", replyText);
log.warn("exchange{}", exchange);
log.warn("routingKey{}", routingKey);
});
}
return bean;
}
/** * retransmit message when NACK **@param correlationDataId
*/
@Transactional(rollbackFor = Exception.class)
public void reSend(String correlationDataId) {
Message message = ApplicationContextHolder.getBean(MessageMapper.class).selectById(correlationDataId);
if (message.getRetryTimes() < maxRetryTimes) {
// Retry
ApplicationContextHolder.getBean(RabbitTemplate.class).convertAndSend(message.getExchangeName(), message.getRoutingKey(), message.getMsgData(), new CorrelationData(correlationDataId));
// Update the DB message status
ApplicationContextHolder.getBean(MessageMapper.class).update(null,
Wrappers.<Message>lambdaUpdate()
.set(Message::getStatus, MessageStatusEnum.SENDING.getStatus())
.set(Message::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval))
.set(Message::getRetryTimes, message.getRetryTimes() + 1) .eq(Message::getId, correlationDataId) ); }}}Copy the code
- Producers:
/ * * *@author futao
* @date2020/3/31. * /
@Component
public class Sender {
@Value("${app.rabbitmq.retry.retry-interval}")
private Duration retryInterval;
@Autowired
private RabbitTemplate rabbitTemplate;
@Resource
private MessageMapper messageMapper;
@Value("${app.rabbitmq.exchange.user}")
private String userExchangeName;
public void send(User user) {
Message message = Message.builder()
.msgData(JSON.toJSONString(user))
.exchangeName(userExchangeName)
.routingKey("user.abc")
.status(MessageStatusEnum.SENDING.getStatus())
// Next retry time
.nextRetryDateTime(LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval))
.retryTimes(0)
.build();
// Message drop
messageMapper.insert(
message
);
CorrelationData correlationData = new CorrelationData(message.getId());
// Messages are posted to MQ
rabbitTemplate.convertAndSend(userExchangeName, "user.abc", JSON.toJSONString(user), correlationData); }}Copy the code
- The scheduled task scans the message status in the DB. If there are messages in the process of sending, and the current time >= next delivery time and retries <= Maximum number of retries, the task sends messages again.
- XxlJob configuration
xxl:
job:
switch: ON
admin:
### Dispatch center deployment and ADDRESS [Optional] : If multiple addresses exist in the dispatch center cluster deployment, separate them with commas (,). The executor will use this address for "executor heartbeat registration" and "task result callback". If it is null, auto registration is disabled.
addresses: http://127.0.0.1:9090/xxl-job-admin
executor:
### executor AppName [optional] : executor heartbeat registration group basis; If empty, auto registration is disabled
appname: xxl-job-executor-rabbitmq
[Optional] : The default value is blank to indicate that the IP address is automatically obtained. When multiple network cards are used, the specified IP address can be manually set. The IP address will not be bound to Host and is only used for communication. Address information is used for "actuator registration" and "dispatch center requests and triggers tasks";
# ip:
### actuator port number [optional] : if less than or equal to 0, automatically obtain; The default port number is 9999. If multiple actuators are deployed on a single machine, configure different actuators.
port: 9999
[Optional] : You need to have read and write permission on the disk where the run log files are stored. If it is empty, the default path is used.
logpath: data/applogs/xxl-job/jobhandler
[Optional] : Expiration logs are automatically cleared. The expiration logs take effect when the limit value is greater than or equal to 3. Otherwise, for example, -1, disable the automatic clearing function.
logretentiondays: 30
### Actuator communication TOKEN [optional] : non-space enabled;
accessToken:
Copy the code
- Configuration
/** * xxl-job Configures **@author futao
* @date2020/4/1. * /
@Setter
@Getter
@Slf4j
@Configuration
@ConfigurationProperties(prefix = "xxl.job")
public class XxlJobConfig {
private final Admin admin = new Admin();
private final Executor executor = new Executor();
@Bean
public XxlJobSpringExecutor xxlJobExecutor(XxlJobConfig xxlJobConfig) {
log.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(xxlJobConfig.getAdmin().getAddresses());
xxlJobSpringExecutor.setAppName(xxlJobConfig.getExecutor().getAppName());
xxlJobSpringExecutor.setIp(xxlJobConfig.getExecutor().getIp());
xxlJobSpringExecutor.setPort(xxlJobConfig.getExecutor().getPort());
xxlJobSpringExecutor.setLogPath(xxlJobConfig.getExecutor().getLogPath());
xxlJobSpringExecutor.setLogRetentionDays(xxlJobConfig.getExecutor().getLogRetentionDays());
return xxlJobSpringExecutor;
}
@Getter
@Setter
public static class Admin {
private String addresses;
}
@Getter
@Setter
public static class Executor {
private String appName;
private String ip;
private int port;
private String logPath;
private intlogRetentionDays; }}Copy the code
- Write periodic scan tasks
/** * scans the database for messages that need to be redelivered and redelivers **@author futao
* @date2020/4/1. * /
@Slf4j
@Component
public class MessageReSendJob extends IJobHandler {
@Autowired
private RabbitTemplate rabbitTemplate;
@Resource
private MessageMapper messageMapper;
@Autowired
private MessageReSendJob messageReSendJob;
/** * Maximum number of retries */
@Value("${app.rabbitmq.retry.max-retry-times}")
private int retryTimes;
/** * Retry interval */
@Value("${app.rabbitmq.retry.retry-interval}")
private Duration retryInterval;
/** * Batch read from the database */
private static final int PAGE_SIZE = 100;
@XxlJob(value = "MessageReSendJob", init = "jobHandlerInit", destroy = "jobHandlerDestroy")
@Override
public ReturnT<String> execute(String s) throws Exception {
long startTime = System.currentTimeMillis();
log.info("Start scanning for messages to be retried.");
XxlJobLogger.log("Start scanning for messages to be retried.");
service(1);
log.info("Scanning for messages to be retried is complete, [{}]ms", System.currentTimeMillis() - startTime);
XxlJobLogger.log("Scanning for messages to be retried is complete, [{}]ms", System.currentTimeMillis() - startTime);
return ReturnT.SUCCESS;
}
public void service(int pageNum) {
IPage<Message> messageIPage = messageMapper.selectPage(new Page<>(pageNum, PAGE_SIZE),
Wrappers.<Message>lambdaQuery()
// Message being sent
.eq(Message::getStatus, MessageStatusEnum.SENDING.getStatus())
// The next delivery time has been reached
.le(Message::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)))); List<Message> messages = messageIPage.getRecords(); messages.forEach(message -> {if (retryTimes <= message.getRetryTimes()) {
// The maximum number of deliveries has been reached, and the message is set to delivery failure
messageMapper.update(null, Wrappers.<Message>lambdaUpdate().set(Message::getStatus, MessageStatusEnum.FAIL.getStatus()).eq(Message::getId, message.getId()));
} else{ messageReSendJob.reSend(message); }});if(PAGE_SIZE == messages.size()) { service(++pageNum); }}/** * repost the message **@param message
*/
public void reSend(Message message) {
messageMapper.update(null,
Wrappers.<Message>lambdaUpdate()
.set(Message::getRetryTimes, message.getRetryTimes() + 1)
.set(Message::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval))
.eq(Message::getId, message.getId())
);
try {
// Repost
rabbitTemplate.convertAndSend(message.getExchangeName(), message.getRoutingKey(), message.getMsgData(), new CorrelationData(message.getId()));
} catch (Exception e) {
log.error("Message [{}] delivery failed", JSON.toJSONString(message)); }}public void jobHandlerInit(a) {
log.info("before job execute...");
XxlJobLogger.log("before job handler init...");
}
public void jobHandlerDestroy(a) {
log.info("after job execute...");
XxlJobLogger.log("after job execute..."); }}Copy the code
- XxlJob Adds a scheduling task
Four, test,
- The test interface
/ * * *@author futao
* @date2020/4/1. * /
@RequestMapping("/user")
@RestController
public class UserController {
@Autowired
private Sender sender;
@RequestMapping("/send")
public void send(a) {
sender.send(User
.builder()
.userName("Astronomical")
.birthday(LocalDate.of(1995.1.31))
.address("Hangzhou, Zhejiang") .build()); }}Copy the code
- Normal scenario:
Message dropped, status 1= sending
The callback
The message is set to post successfully
- Abnormal scenario
Stop MQ sending messages after starting the producer service
Failed to receive an ACK for this message. So it’s always sending. Enable task scheduling to deliver again (the number of deliveries +1, and update the next delivery time)
When the maximum number of deliveries is reached, next time, the message is set to deliver failed
Operation log
# Next
- Reliable information consumption
- Traffic limiting protection at the consumer end
- Dead-letter queue
- Delays in the queue