Scheme 1: message storage, timing pull and re-delivery
Message flow chart
Advantages and disadvantages
The advantage of this approach is that the implementation is simple, there are not as many bells and whistles, and the message is guaranteed to be 100 percent successful. Disadvantage is clear, however, is in the beginning in addition to business data needs to be put in storage, but also will be put in storage, message and compensate the message in time also need additional query a database failed to get the delivery information, and database operations are very slow, will make the business processing becomes slowly, can consider when data volume is not very big.
The specific implementation
Step 1: Store business data and message data separately. When storing the message, note that the entity of the message should have a status field to indicate whether the message has been consumed. For example, 0 indicates that the message has not been consumed, and 1 indicates that the message has been consumed.
The second step: The ConfirmCallback is triggered if the message is not sent correctly to the Exchange, and is then routed to the corresponding queue according to the RoutingKey. If the queue is not routed correctly, the ReturnCallback is triggered, and both callbacks can listen for their own processing.
Step 3: The consumer listens to the corresponding queue and consumes the messages in it, requiring an ACK when the consumption is complete.
Step 4: After receiving an ACK, the producer needs to change the message status in the message database to 1, that is, consumed.
Step 5: Start a scheduled task that periodically queries messages with message status 0 in the message database and notifies the producer to redeliver the unconsumed messages.
The Demo code
RabbitConfig Specifies the RabbitMQ switch and queue configuration.
@Configuration
public class RabbitConfig {
/** ** Order switch **@returnFanout switch */
@Bean
public FanoutExchange ORDER_EXCHANGE(a) {
return ExchangeBuilder.fanoutExchange("ORDER_EXCHANGE").durable(true).build();
}
/** ** Order queue **@returnQueue * /
@Bean
public Queue ORDER_QUEUE(a) {
return QueueBuilder.durable("ORDER_QUEUE")
.maxLengthBytes(1024 * 1024 * 128)
.maxLength(50000)
.build();
}
/** * The binding between the order switch and the queue **@returnBinding relationship */
@Bean
public Binding ORDER_BINDING(a) {
returnBindingBuilder.bind(ORDER_QUEUE()).to(ORDER_EXCHANGE()); }}Copy the code
RabbitConfirmCallback triggers the Confirm callback when messages are not delivered correctly to the corresponding switch, where you can do extra processing such as logging.
@Component
public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {}}Copy the code
RabbitReturnCallback Triggers a Return callback when a message has already been posted to the switch but has not been properly routed to the corresponding queue via a RoutingKey. Additional processing can also be done here, such as logging.
@Component
public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {}}Copy the code
RabbitTemplateFactory this class mainly configuration RabbitTemplate and message converters, etc., additional did a RabbitTemplate pooling processing, make each switch corresponds to a RabbitTemplate, improve the efficiency of delivering a message.
@Configuration
public class RabbitTemplateFactory {
@Resource
private ConnectionFactory connectionFactory;
@Resource
private RabbitConfirmCallback rabbitConfirmCallback;
@Resource
private RabbitReturnCallback rabbitReturnCallback;
Map<String, RabbitTemplate> rabbitTemplateMap = new ConcurrentHashMap<>();
/** * Message converter for RabbitMQ **@returnMessage converter */
@Bean
public MessageConverter messageConverter(a) {
return new Jackson2JsonMessageConverter();
}
/** * Produce different rabbittemplates for different switches **@paramExchangeName Switch name *@return RabbitTemplate
*/
public RabbitTemplate getRabbitTemplate(String exchangeName) {
RabbitTemplate rabbitTemplate = rabbitTemplateMap.get(exchangeName);
if(rabbitTemplate ! =null) {
return rabbitTemplate;
}
rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange(exchangeName);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setMessageConverter(messageConverter());
rabbitTemplate.setRetryTemplate(new RetryTemplate());
rabbitTemplate.setConfirmCallback(rabbitConfirmCallback);
rabbitTemplate.setReturnCallback(rabbitReturnCallback);
returnrabbitTemplate; }}Copy the code
MessageConsumer A consumer of a message that processes the message and changes its consumption status to consumed after the message is consumed.
@Component
public class MessageConsumer {
private final MessageRepository messageRepository;
public MessageConsumer(MessageRepository messageRepository) {
this.messageRepository = messageRepository;
}
@RabbitListener(queues = "ORDER_QUEUE")
public void onMessage(@Headers Map<String, Object> headers, @Payload Order order, Channel channel) throws Exception {
// 3. Consume the message. ACK if the message is consumed successfully
System.out.println("====== Start consuming message ======");
System.out.println("The message reads:" + order);
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
System.out.println("====== consumption message success ======");
// 4. Change the message status in the message database to consumed
Message message = messageRepository.findMessageByMessageId(order.getId());
message.setStatus(true); messageRepository.save(message); }}Copy the code
RabbitController is mainly used to simulate message delivery for easy testing.
@RestController
public class RabbitController {
private final MessageProducer messageProducer;
private final MessageRepository messageRepository;
private final OrderRepository orderRepository;
public RabbitController(MessageProducer messageProducer, MessageRepository messageRepository, OrderRepository orderRepository) {
this.messageProducer = messageProducer;
this.messageRepository = messageRepository;
this.orderRepository = orderRepository;
}
@PostMapping("/place/order")
public String placeOrder(@RequestBody Order order) {
// 1. Service data is stored. / message data is stored
Order result = orderRepository.save(order);
Message message = new Message();
message.setMessageId(result.getId());
message.setContent(order);
message.setStatus(false);
messageRepository.save(message);
// 2
messageProducer.send(order);
return "Success"; }}Copy the code
Message business model for messages, corresponding to the Message table in the database, used to hold messages.
@Entity
@Data
@TypeDef(name = "json", typeClass = JsonStringType.class)
public class Message implements Serializable {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long messageId;
@Type(type = "json")
@Column(columnDefinition = "json")
private Order content;
/** Message status, 1 indicates unconsumed or failed to consume, 0 indicates consumed **/
private boolean status;
}
Copy the code
The Order business model, corresponding to the Order table in the database, is used to hold the Order data.
@Entity
@Data
@Table(name = "`order`")
public class Order implements Serializable {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Integer productNumber;
private BigDecimal totalPrice;
}
Copy the code
MessageProducer MessageProducer, which is used to deliver messages to RabbitMQ.
@Component
public class MessageProducer {
private final RabbitTemplateFactory rabbitTemplateFactory;
public MessageProducer(RabbitTemplateFactory rabbitTemplateFactory) {
this.rabbitTemplateFactory = rabbitTemplateFactory;
}
/** * Send message *@paramOrder message body */
public void send(Order order) {
RabbitTemplate rabbitTemplate = rabbitTemplateFactory.getRabbitTemplate("ORDER_EXCHANGE");
CorrelationData correlationData = new CorrelationData(String.valueOf(order.getId()));
System.out.println("====== Start sending message ======");
rabbitTemplate.convertAndSend("ORDER_EXCHANGE".null, order, correlationData);
System.out.println("====== End sending message ======"); }}Copy the code
The MessageRepository ORM uses JPA, and the MessageRepository is mainly used to do some database operations on the Message table.
public interface MessageRepository extends JpaRepository<Message.Long> {
Message findMessageByMessageId(Long messageId);
List<Message> findMessagesByStatus(boolean status);
}
Copy the code
OrderRepository is mainly used to perform operations on order tables.
public interface OrderRepository extends JpaRepository<Order.Long> {}Copy the code
MessageCheckTask A scheduled task that periodically queries the database for unconsumed messages in the message table and compensates for them.
@Component
public class MessageCheckTask {
private final MessageRepository messageRepository;
private final MessageProducer messageProducer;
public MessageCheckTask(MessageRepository messageRepository, MessageProducer messageProducer) {
this.messageRepository = messageRepository;
this.messageProducer = messageProducer;
}
@Scheduled(initialDelay = 60000, fixedDelay = 60000)
public void messageCheck(a) {
List<Message> messages = messageRepository.findMessagesByStatus(false);
if (messages == null || messages.size() == 0) {
System.out.println("====== no news of consumption failure ======");
return;
}
for (Message message : messages) {
System.out.println("====== found consumption failure message ======");
Order order = message.getContent();
messageProducer.send(order);
System.out.println("====== delivery consumption failure message ======"); }}}Copy the code
Scheme 2: message delay delivery
Message flow chart
Advantages and disadvantages
To implement this way than on a way to more complicated, but in the message delivery in the beginning will be put in storage, business data without the need for the business data is put in storage, also don’t have to open a regular task to timing query message delivery failure or consumption in the database, under the scenarios of high concurrency, This approach is definitely better than the previous one.
There is no guarantee of 100% success. If the first message fails and the first delayed message fails, the message is permanently lost because the message is now stored separately between the two services.
The specific implementation
Step 1: Just put the business data into the repository.
The second step: The ConfirmCallback is triggered if the message is not sent correctly to the Exchange, and is then routed to the corresponding queue according to the RoutingKey. If the queue is not routed correctly, the ReturnCallback is triggered, and both callbacks can listen for their own processing.
The third step: deliver a delay message at the same time, the content of the delay message is exactly the same as the message in the previous step, but the delivery queue is not the same, the delay message delivery queue is the delay queue. The latency can vary depending on the service, but prior to RabbitMQ3.6 it was a bit more difficult to deliver delayed messages using either a dead letter queue or Java’s DelayQueue. A rabbitmq_delayed_message_exchange plugin has been available since version 3.6 to delay message delivery.
Step 4: The consumer listens to the corresponding queue and consumes the messages in it. After the consumption is complete, it needs to ACK and assemble the ConfirmMessage to the message confirmation queue (this message is to confirm that the current message has been consumed successfully, or it can use Redis to save the message consumption confirmation result).
Step 5: Other a callback service, the service mainly to monitor queue delay and message queue, first listen to the message to confirm the message queue, receives the message (or query whether to save the confirmation message) in the Redis after the message storage processing, to save the message to the message database (save consumption of success), At the same time, it monitors the delayed message queue. When receiving the delayed message, it first judges whether the current message has been consumed. If it has been consumed, it does not do any processing and ACK directly.
The Demo code
Note: The following code uses HTTP instead of RPC for ease of writing
Advance preparation
- Go to RabbitMQ to download it
rabbitmq_delayed_message_exchange
Plug-in, and then move the plug-in toplugins
Under the - To enable the
rabbitmq_delayed_message_exchange
The plug-in
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Copy the code
The Message service
ClientConfig Configures the RestTemplate, which is used to initiate HTTP requests.
@Configuration
public class ClientConfig {
@Bean
public RestTemplate restTemplate(a) {
return newRestTemplate(); }}Copy the code
RabbitConfig Specifies the RabbitMQ switch and queue-related configurations, where the main Delay switches are set to Delay switches.
@Configuration
public class RabbitConfig {
/** ** Order switch **@returnFanout switch */
@Bean
public FanoutExchange ORDER_EXCHANGE(a) {
return ExchangeBuilder.fanoutExchange("ORDER_EXCHANGE").durable(true).delayed().build();
}
/** ** Order queue **@returnQueue * /
@Bean
public Queue ORDER_QUEUE(a) {
return QueueBuilder.durable("ORDER_QUEUE")
.maxLengthBytes(1024 * 1024 * 128)
.maxLength(50000)
.build();
}
/** * The binding between the order switch and the queue **@returnBinding relationship */
@Bean
public Binding ORDER_BINDING(a) {
return BindingBuilder.bind(ORDER_QUEUE()).to(ORDER_EXCHANGE());
}
/** * Order delay message switch **@returnFanout switch */
@Bean
public FanoutExchange ORDER_DELAY_EXCHANGE(a) {
return ExchangeBuilder.fanoutExchange("ORDER_DELAY_EXCHANGE").durable(true).delayed().build();
}
/** * Order delay message queue **@returnQueue * /
@Bean
public Queue ORDER_DELAY_QUEUE(a) {
return QueueBuilder.durable("ORDER_DELAY_QUEUE")
.maxLengthBytes(1024 * 1024 * 128)
.maxLength(50000)
.build();
}
/** * The binding relationship between the order delay message switch and the delay message queue **@returnBinding relationship */
@Bean
public Binding ORDER_DELAY_BINDING(a) {
return BindingBuilder.bind(ORDER_DELAY_QUEUE()).to(ORDER_DELAY_EXCHANGE());
}
/** * Order confirmation message switch **@returnFanout switch */
@Bean
public FanoutExchange CONFIRM_EXCHANGE(a) {
return ExchangeBuilder.fanoutExchange("CONFIRM_EXCHANGE").durable(true).build();
}
/** * Order confirmation message queue **@returnQueue * /
@Bean
public Queue CONFIRM_QUEUE(a) {
return QueueBuilder.durable("CONFIRM_QUEUE")
.maxLengthBytes(1024 * 1024 * 128)
.maxLength(50000)
.build();
}
/** * The binding relationship between the switch and the confirmation queue **@returnBinding relationship */
@Bean
public Binding CONFIRM_BINDING(a) {
returnBindingBuilder.bind(CONFIRM_QUEUE()).to(CONFIRM_EXCHANGE()); }}Copy the code
RabbitConfirmCallback triggers the Confirm callback when messages are not delivered correctly to the corresponding switch, where you can do extra processing such as logging.
@Component
public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {}}Copy the code
RabbitReturnCallback Triggers a Return callback when a message has already been posted to the switch but has not been properly routed to the corresponding queue via a RoutingKey. Additional processing can also be done here, such as logging.
@Component
public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {}}Copy the code
RabbitTemplateFactory this class mainly configuration RabbitTemplate and message converters, etc., additional did a RabbitTemplate pooling processing, make each switch corresponds to a RabbitTemplate, improve the efficiency of delivering a message.
@Slf4j
@Configuration
public class RabbitTemplateFactory {
@Resource
private ConnectionFactory connectionFactory;
@Resource
private RabbitConfirmCallback rabbitConfirmCallback;
@Resource
private RabbitReturnCallback rabbitReturnCallback;
Map<String, RabbitTemplate> rabbitTemplateMap = new ConcurrentHashMap<>();
/** * Message converter for RabbitMQ **@returnMessage converter */
@Bean
public MessageConverter messageConverter(a) {
return new Jackson2JsonMessageConverter();
}
/** * Produce different rabbittemplates for different switches **@paramExchangeName Switch name *@return RabbitTemplate
*/
public RabbitTemplate getRabbitTemplate(String exchangeName) {
RabbitTemplate rabbitTemplate = rabbitTemplateMap.get(exchangeName);
if(rabbitTemplate ! =null) {
return rabbitTemplate;
}
rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange(exchangeName);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setMessageConverter(messageConverter());
rabbitTemplate.setRetryTemplate(new RetryTemplate());
rabbitTemplate.setConfirmCallback(rabbitConfirmCallback);
rabbitTemplate.setReturnCallback(rabbitReturnCallback);
returnrabbitTemplate; }}Copy the code
RedisConfig Some configuration of Redis, such as Key and Value serializer, etc.
@Configuration
public class RedisConfig {
private final RedisConnectionFactory redisConnectionFactory;
public RedisConfig(RedisConnectionFactory redisConnectionFactory) {
this.redisConnectionFactory = redisConnectionFactory;
}
@Bean
public RedisTemplate<String, Object> redisTemplate(a) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(valueSerializer());
return redisTemplate;
}
@Bean
public GenericJackson2JsonRedisSerializer valueSerializer(a) {
return newGenericJackson2JsonRedisSerializer(); }}Copy the code
MessageProducer a MessageProducer that is used to deliver messages to RabbitMQ, including instant messages and delayed messages.
@Component
public class MessageProducer {
private final RabbitTemplateFactory rabbitTemplateFactory;
public MessageProducer(RabbitTemplateFactory rabbitTemplateFactory) {
this.rabbitTemplateFactory = rabbitTemplateFactory;
}
/** * Post message entry, including retry mechanism (instant post and delayed post) **@paramExchangeName Switch name *@paramDelayExchangeName Delay switch name *@paramContent Message body *@paramCount Number of messages delayed */
public void send(String exchangeName, String delayExchangeName, Object content, String eventId, Integer count) {
sendInTime(exchangeName, content, eventId, count);
delaySend(delayExchangeName, content, eventId, count);
}
/** * Instant delivery message **@paramExchangeName Switch name *@paramContent Message body *@paramEventId eventId, which is the unique identifier of the message *@paramCount Number of messages delayed */
public void sendInTime(String exchangeName, Object content, String eventId, Integer count) {
RabbitTemplate rabbitTemplate = rabbitTemplateFactory.getRabbitTemplate(exchangeName);
CorrelationData correlationData = new CorrelationData(eventId);
rabbitTemplate.convertAndSend(exchangeName, null, content, message -> {
// Count the number of times the message is delivered. If the same message has been delivered for three times but has not been consumed successfully, it will be put into the database
System.out.println("====== start the first" + count + "Once send message ======");
message.getMessageProperties().setHeader("messageCount", count);
System.out.println("====== end the first" + count + "Once send message ======");
return message;
}, correlationData);
}
/** * Delay message delivery **@paramExchangeName Switch name *@paramContent Message body *@paramEventId eventId, which is the unique identifier of the message *@paramCount Number of messages delayed */
public void delaySend(String exchangeName, Object content, String eventId, Integer count) {
RabbitTemplate rabbitTemplate = rabbitTemplateFactory.getRabbitTemplate(exchangeName);
CorrelationData correlationData = new CorrelationData(eventId);
rabbitTemplate.convertAndSend(exchangeName, null, content, message -> {
int delayTime = 30 * 1000;
System.out.println("====== start the first" + count + "Delay message ======");
message.getMessageProperties().setHeader("messageCount", count);
message.getMessageProperties().setDelay(delayTime);
System.out.println("====== end the first" + count + "Delay message ======");
returnmessage; }, correlationData); }}Copy the code
MessageConsumer a consumer of a message, which performs business processing on the message and saves the message consumption result in Redis after the message consumption is completed.
@Component
public class MessageConsumer {
private final RedisTemplate<String, Object> redisTemplate;
public MessageConsumer(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
@RabbitListener(queues = "ORDER_QUEUE")
public void onMessage(@Headers Map<String, Object> headers, @Payload Order order, Channel channel) throws Exception {
// 3. Consume the message. ACK if the message is consumed successfully
System.out.println("====== Start consuming message ======");
System.out.println("The message reads:" + order);
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
System.out.println("====== consumption message success ======");
Message message = new Message();
message.setMessageId(order.getId());
message.setContent(order);
// Redis saves message consumption results
redisTemplate.opsForValue().set(String.valueOf(order.getId()), message, 4 * 60, TimeUnit.SECONDS); }}Copy the code
RabbitController is mainly used to simulate message delivery for easy testing.
@RestController
public class RabbitController {
private final MessageProducer messageProducer;
private final OrderRepository orderRepository;
public RabbitController(MessageProducer messageProducer, OrderRepository orderRepository) {
this.messageProducer = messageProducer;
this.orderRepository = orderRepository;
}
@PostMapping("/place/order")
public String placeOrder(@RequestBody Order order, @RequestParam(value = "count", defaultValue = "1") Integer count) {
// 1. Import service data into the database
order.setTimestamp(System.currentTimeMillis());
orderRepository.save(order);
// 2
messageProducer.send("ORDER_EXCHANGE"."ORDER_DELAY_EXCHANGE", order, String.valueOf(order.getId()), count);
return "Success"; }}Copy the code
Message business model for messages, corresponding to the Message table in the database, used to hold messages.
@Entity
@Data
@TypeDef(name = "json", typeClass = JsonStringType.class)
public class Message implements Serializable {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long messageId;
@Type(type = "json")
@Column(columnDefinition = "json")
private Order content;
}
Copy the code
ConfirmMessage confirms the business model for the message, and after the message consumption is complete, the ConfirmMessage can be assembled for delivery to inform the callback service that the message has been consumed. (This model is not used in this code, but Redis is used to record the message consumption results.)
@Data
public class ConfirmMessage {
private Long messageId;
private boolean status;
}
Copy the code
The Order business model, corresponding to the Order table in the database, is used to hold the Order data.
@Entity
@Data
@Table(name = "`order`")
public class Order implements Serializable {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Integer productNumber;
private BigDecimal totalPrice;
private Long timestamp;
}
Copy the code
The MessageRepository ORM uses JPA, and the MessageRepository is mainly used to do some database operations on the Message table.
public interface MessageRepository extends JpaRepository<Message.Long> {
Message findMessageByMessageId(Long messageId);
}
Copy the code
OrderRepository is mainly used to perform operations on order tables.
public interface OrderRepository extends JpaRepository<Order.Long> {}Copy the code
Callback – Service Service
ClientConfig Configures the RestTemplate, which is used to initiate HTTP requests.
@Configuration
public class ClientConfig {
@Bean
public RestTemplate restTemplate(a) {
return newRestTemplate(); }}Copy the code
RabbitConfig Specifies the RabbitMQ switch and queue-related configurations, where the main Delay switches are set to Delay switches.
@Configuration
public class RabbitConfig {
/** * Message converter for RabbitMQ **@returnMessage converter */
@Bean
public MessageConverter messageConverter(a) {
return newJackson2JsonMessageConverter(); }}Copy the code
RedisConfig Some configuration of Redis, such as Key and Value serializer, etc.
@Configuration
public class RedisConfig {
private final RedisConnectionFactory redisConnectionFactory;
public RedisConfig(RedisConnectionFactory redisConnectionFactory) {
this.redisConnectionFactory = redisConnectionFactory;
}
@Bean
public RedisTemplate<String, Object> redisTemplate(a) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(valueSerializer());
return redisTemplate;
}
@Bean
public GenericJackson2JsonRedisSerializer valueSerializer(a) {
return newGenericJackson2JsonRedisSerializer(); }}Copy the code
The consumer of MessageConsumer messages performs business processing on the delayed messages. When receiving the delayed messages, it first determines whether the messages have been consumed. If the messages have been consumed (there is a confirmation result of consumption in Redis), the messages are stored in the database and then directly ACK.
@Component
public class MessageConsumer {
private final RedisTemplate<String, Object> redisTemplate;
private final RestTemplate restTemplate;
private final MessageRepository messageRepository;
private final ObjectMapper objectMapper;
public MessageConsumer(RedisTemplate<String, Object> redisTemplate, RestTemplate restTemplate, MessageRepository messageRepository, ObjectMapper objectMapper) {
this.redisTemplate = redisTemplate;
this.restTemplate = restTemplate;
this.messageRepository = messageRepository;
this.objectMapper = objectMapper;
}
@RabbitListener(queues = "ORDER_DELAY_QUEUE")
public void onMessage(@Headers Map<String, Object> headers, @Payload Order order, Channel channel) throws Exception {
Check whether the current message has been consumed successfully. If the message has been consumed successfully, the Producer directly ACK the message. Otherwise, the Producer uses RPC to notify the Producer to redeliver the message
Object result = redisTemplate.opsForValue().get(String.valueOf(order.getId()));
if (result == null) {
// Determine the current number of deliveries
Integer messageCount = (Integer) headers.get("messageCount");
System.out.println("Time:" + System.currentTimeMillis() + ", the first" + messageCount + "This delayed delivery received, redelivery underway.");
restTemplate.postForObject("http://localhost:8080/place/order? count=" + (messageCount + 1), order, String.class);
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
return;
}
/ / warehousing
Message message = objectMapper.convertValue(result, Message.class);
messageRepository.save(message);
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
System.out.println("====== message has been consumed ======"); }}Copy the code
The MessageRepository ORM uses JPA, and the MessageRepository is mainly used to do some database operations on the Message table.
public interface MessageRepository extends JpaRepository<Message.Long>, JpaSpecificationExecutor<Message> {}Copy the code
Message business model for messages, corresponding to the Message table in the database, used to hold messages.
@Entity
@Data
@TypeDef(name = "json", typeClass = JsonStringType.class)
public class Message implements Serializable {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long messageId;
@Type(type = "json")
@Column(columnDefinition = "json")
private Order content;
}
Copy the code
ConfirmMessage confirms the business model for the message, and after the message consumption is complete, the ConfirmMessage can be assembled for delivery to inform the callback service that the message has been consumed. (This model is not used in this code, but Redis is used to record the message consumption results.)
@Data
public class ConfirmMessage {
private Long messageId;
private boolean status;
}
Copy the code
The Order business model, corresponding to the Order table in the database, is used to hold the Order data.
@Entity
@Data
@Table(name = "`order`")
public class Order implements Serializable {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Integer productNumber;
private BigDecimal totalPrice;
private Long timestamp;
}
Copy the code
Scheme 3: delayed message delivery + HTTP
Message flow chart
Advantages and disadvantages
Compared to the first scheme, this scheme will message storage steps of transferred to another service, make it won’t be a drag on execution of the business, but for the second scheme, there is a disadvantage, is the need to open a timing task, time consumption failed messages to query the database, but this solution can give attention to both the timeliness of news consumption, When our business needs messages to be consumed in a certain amount of time, we need the flexibility to change the message delivery scheme.
There is no guarantee of 100% success. If the first message fails and the first delayed message fails, the message is permanently lost because the message is now stored separately between the two services.
The specific implementation
Step 1: Just put the business data into the repository.
The second step: The ConfirmCallback is triggered if the message is not sent correctly to the Exchange, and is then routed to the corresponding queue according to the RoutingKey. If the queue is not routed correctly, the ReturnCallback is triggered, and both callbacks can listen for their own processing.
The third step: deliver a delay message at the same time, the content of the delay message is exactly the same as the message in the previous step, but the delivery queue is not the same, the delay message delivery queue is the delay queue. The latency can vary depending on the service, but prior to RabbitMQ3.6 it was a bit more difficult to deliver delayed messages using either a dead letter queue or Java’s DelayQueue. A rabbitmq_delayed_message_exchange plugin has been available since version 3.6 to delay message delivery. But this is to limit the number and time delay delivery, such as delayed delivery up to 2 times, most time for 3 minutes, if the consumption has not been successful in 3 minutes, you need to the message storage (here the message database only save consumption failed messages), waiting for the timing task from the pull from the database, business processing.
Step 4: The consumer listens to the corresponding queue and consumes the messages in it. After the consumption is complete, it needs to ACK and assemble the ConfirmMessage to the message confirmation queue (this message is to confirm that the current message has been consumed successfully, or it can use Redis to save the message consumption confirmation result).
Step 5: Create another callback service, this service mainly listens to delay queue and message confirmation queue, first listens to the message in the message confirmation queue, receives the message (or queries whether there are confirmation messages in Redis) after Redis save, and then can directly ACK, do not do any processing. Monitor message queuing delay at the same time, after receiving the message delay, first determines whether the current news has been spending, if have consumption, will not do any processing, ACK directly, otherwise we will notify producers through RPC messages to delivery, is back in the second step, repeat to a certain number of times or over a certain time to put in storage the message.
The Demo code
Note: The following code uses HTTP instead of RPC for ease of writing, does not capture global exceptions, and does not consider various exceptions in the API interface
Advance preparation
- Go to RabbitMQ to download it
rabbitmq_delayed_message_exchange
Plug-in, and then move the plug-in toplugins
Under the - To enable the
rabbitmq_delayed_message_exchange
The plug-in
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Copy the code
The Message service
ClientConfig Configures the RestTemplate, which is used to initiate HTTP requests.
@Configuration
public class ClientConfig {
@Bean
public RestTemplate restTemplate(a) {
return newRestTemplate(); }}Copy the code
RabbitConfig Specifies the RabbitMQ switch and queue-related configurations, where the main Delay switches are set to Delay switches.
@Configuration
public class RabbitConfig {
/** ** Order switch **@returnFanout switch */
@Bean
public FanoutExchange ORDER_EXCHANGE(a) {
return ExchangeBuilder.fanoutExchange("ORDER_EXCHANGE").durable(true).delayed().build();
}
/** ** Order queue **@returnQueue * /
@Bean
public Queue ORDER_QUEUE(a) {
return QueueBuilder.durable("ORDER_QUEUE")
.maxLengthBytes(1024 * 1024 * 128)
.maxLength(50000)
.build();
}
/** * The binding between the order switch and the queue **@returnBinding relationship */
@Bean
public Binding ORDER_BINDING(a) {
return BindingBuilder.bind(ORDER_QUEUE()).to(ORDER_EXCHANGE());
}
/** * Order delay message switch **@returnFanout switch */
@Bean
public FanoutExchange ORDER_DELAY_EXCHANGE(a) {
return ExchangeBuilder.fanoutExchange("ORDER_DELAY_EXCHANGE").durable(true).delayed().build();
}
/** * Order delay message queue **@returnQueue * /
@Bean
public Queue ORDER_DELAY_QUEUE(a) {
return QueueBuilder.durable("ORDER_DELAY_QUEUE")
.maxLengthBytes(1024 * 1024 * 128)
.maxLength(50000)
.build();
}
/** * The binding relationship between the order delay message switch and the delay message queue **@returnBinding relationship */
@Bean
public Binding ORDER_DELAY_BINDING(a) {
return BindingBuilder.bind(ORDER_DELAY_QUEUE()).to(ORDER_DELAY_EXCHANGE());
}
/** * Order confirmation message switch **@returnFanout switch */
@Bean
public FanoutExchange CONFIRM_EXCHANGE(a) {
return ExchangeBuilder.fanoutExchange("CONFIRM_EXCHANGE").durable(true).build();
}
/** * Order confirmation message queue **@returnQueue * /
@Bean
public Queue CONFIRM_QUEUE(a) {
return QueueBuilder.durable("CONFIRM_QUEUE")
.maxLengthBytes(1024 * 1024 * 128)
.maxLength(50000)
.build();
}
/** * The binding relationship between the switch and the confirmation queue **@returnBinding relationship */
@Bean
public Binding CONFIRM_BINDING(a) {
returnBindingBuilder.bind(CONFIRM_QUEUE()).to(CONFIRM_EXCHANGE()); }}Copy the code
RabbitConfirmCallback triggers the Confirm callback when messages are not delivered correctly to the corresponding switch, where you can do extra processing such as logging.
@Component
public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {}}Copy the code
RabbitReturnCallback Triggers a Return callback when a message has already been posted to the switch but has not been properly routed to the corresponding queue via a RoutingKey. Additional processing can also be done here, such as logging.
@Component
public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {}}Copy the code
RabbitTemplateFactory this class mainly configuration RabbitTemplate and message converters, etc., additional did a RabbitTemplate pooling processing, make each switch corresponds to a RabbitTemplate, improve the efficiency of delivering a message.
@Slf4j
@Configuration
public class RabbitTemplateFactory {
@Resource
private ConnectionFactory connectionFactory;
@Resource
private RabbitConfirmCallback rabbitConfirmCallback;
@Resource
private RabbitReturnCallback rabbitReturnCallback;
Map<String, RabbitTemplate> rabbitTemplateMap = new ConcurrentHashMap<>();
/** * Message converter for RabbitMQ **@returnMessage converter */
@Bean
public MessageConverter messageConverter(a) {
return new Jackson2JsonMessageConverter();
}
/** * Produce different rabbittemplates for different switches **@paramExchangeName Switch name *@return RabbitTemplate
*/
public RabbitTemplate getRabbitTemplate(String exchangeName) {
RabbitTemplate rabbitTemplate = rabbitTemplateMap.get(exchangeName);
if(rabbitTemplate ! =null) {
return rabbitTemplate;
}
rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange(exchangeName);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setMessageConverter(messageConverter());
rabbitTemplate.setRetryTemplate(new RetryTemplate());
rabbitTemplate.setConfirmCallback(rabbitConfirmCallback);
rabbitTemplate.setReturnCallback(rabbitReturnCallback);
returnrabbitTemplate; }}Copy the code
RedisConfig Some configuration of Redis, such as Key and Value serializer, etc.
@Configuration
public class RedisConfig {
private final RedisConnectionFactory redisConnectionFactory;
public RedisConfig(RedisConnectionFactory redisConnectionFactory) {
this.redisConnectionFactory = redisConnectionFactory;
}
@Bean
public RedisTemplate<String, Object> redisTemplate(a) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(valueSerializer());
return redisTemplate;
}
@Bean
public GenericJackson2JsonRedisSerializer valueSerializer(a) {
return newGenericJackson2JsonRedisSerializer(); }}Copy the code
MessageProducer a MessageProducer that is used to deliver messages to RabbitMQ, including instant messages and delayed messages.
@Component
public class MessageProducer {
private final RabbitTemplateFactory rabbitTemplateFactory;
public MessageProducer(RabbitTemplateFactory rabbitTemplateFactory) {
this.rabbitTemplateFactory = rabbitTemplateFactory;
}
/** * Post message entry, including retry mechanism (instant post and delayed post) **@paramExchangeName Switch name *@paramDelayExchangeName Delay switch name *@paramContent Message body *@paramCount Number of messages delayed */
public void send(String exchangeName, String delayExchangeName, Object content, String eventId, Integer count) {
sendInTime(exchangeName, content, eventId, count);
delaySend(delayExchangeName, content, eventId, count);
}
/** * Instant delivery message **@paramExchangeName Switch name *@paramContent Message body *@paramEventId eventId, which is the unique identifier of the message *@paramCount Number of messages delayed */
public void sendInTime(String exchangeName, Object content, String eventId, Integer count) {
RabbitTemplate rabbitTemplate = rabbitTemplateFactory.getRabbitTemplate(exchangeName);
CorrelationData correlationData = new CorrelationData(eventId);
rabbitTemplate.convertAndSend(exchangeName, null, content, message -> {
// Count the number of times the message is delivered. If the same message has been delivered for three times but has not been consumed successfully, it will be put into the database
System.out.println("====== start the first" + count + "Once send message ======");
message.getMessageProperties().setHeader("messageCount", count);
System.out.println("====== end the first" + count + "Once send message ======");
return message;
}, correlationData);
}
/** * Delay message delivery **@paramExchangeName Switch name *@paramContent Message body *@paramEventId eventId, which is the unique identifier of the message *@paramCount Number of messages delayed */
public void delaySend(String exchangeName, Object content, String eventId, Integer count) {
RabbitTemplate rabbitTemplate = rabbitTemplateFactory.getRabbitTemplate(exchangeName);
CorrelationData correlationData = new CorrelationData(eventId);
rabbitTemplate.convertAndSend(exchangeName, null, content, message -> {
int delayTime;
if (count == 1) {
// The first delivery delay is 1 minute
delayTime = 60 * 1000;
} else {
// The second delivery delay is 2 minutes
delayTime = 2 * 60 * 1000;
}
System.out.println("====== start the first" + count + "Delay message ======");
message.getMessageProperties().setHeader("messageCount", count);
message.getMessageProperties().setDelay(delayTime);
System.out.println("====== end the first" + count + "Delay message ======");
returnmessage; }, correlationData); }}Copy the code
MessageConsumer a consumer of a message, which performs business processing on the message and saves the message consumption result in Redis after the message consumption is completed.
@Component
public class MessageConsumer {
private final RedisTemplate<String, Object> redisTemplate;
public MessageConsumer(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
@RabbitListener(queues = "ORDER_QUEUE")
public void onMessage(@Headers Map<String, Object> headers, @Payload Order order, Channel channel) throws Exception {
// 3. Consume the message. ACK if the message is consumed successfully
System.out.println("====== Start consuming message ======");
System.out.println("The message reads:" + order);
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
System.out.println("====== consumption message success ======");
Message message = new Message();
message.setMessageId(order.getId());
message.setContent(order);
// Redis saves message consumption results
redisTemplate.opsForValue().set(String.valueOf(order.getId()), message, 4 * 60, TimeUnit.SECONDS); }}Copy the code
RabbitController is mainly used to simulate message delivery for easy testing.
@RestController
public class RabbitController {
private final MessageProducer messageProducer;
private final OrderRepository orderRepository;
public RabbitController(MessageProducer messageProducer, OrderRepository orderRepository) {
this.messageProducer = messageProducer;
this.orderRepository = orderRepository;
}
@PostMapping("/place/order")
public String placeOrder(@RequestBody Order order, @RequestParam(value = "count", defaultValue = "1") Integer count) {
// 1. Import service data into the database
order.setTimestamp(System.currentTimeMillis());
orderRepository.save(order);
// 2
messageProducer.send("ORDER_EXCHANGE"."ORDER_DELAY_EXCHANGE", order, String.valueOf(order.getId()), count);
return "Success"; }}Copy the code
Message business model for messages, corresponding to the Message table in the database, used to hold messages.
@Entity
@Data
@TypeDef(name = "json", typeClass = JsonStringType.class)
public class Message implements Serializable {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long messageId;
@Type(type = "json")
@Column(columnDefinition = "json")
private Order content;
}
Copy the code
ConfirmMessage confirms the business model for the message, and after the message consumption is complete, the ConfirmMessage can be assembled for delivery to inform the callback service that the message has been consumed. (This model is not used in this code, but Redis is used to record the message consumption results.)
@Data
public class ConfirmMessage {
private Long messageId;
private boolean status;
}
Copy the code
The Order business model, corresponding to the Order table in the database, is used to hold the Order data.
@Entity
@Data
@Table(name = "`order`")
public class Order implements Serializable {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Integer productNumber;
private BigDecimal totalPrice;
private Long timestamp;
}
Copy the code
The MessageRepository ORM uses JPA, and the MessageRepository is mainly used to do some database operations on the Message table.
public interface MessageRepository extends JpaRepository<Message.Long> {
Message findMessageByMessageId(Long messageId);
}
Copy the code
OrderRepository is mainly used to perform operations on order tables.
public interface OrderRepository extends JpaRepository<Order.Long> {}Copy the code
FailureMessageCheckTask A scheduled task that periodically pulls consumption failure messages from the message database. The latestId passed in for the first time is 0, indicating that all consumption failure messages are pulled. Then the latestId id is saved and assigned to the latestId. Later, the latestId needs to be passed in the request to query the message of consumption failure, indicating that the query is larger than the message of consumption failure after the latestId.
@Component
public class FailureMessageCheckTask {
private final RestTemplate restTemplate;
private final RedisTemplate<String, Object> redisTemplate;
private final ObjectMapper objectMapper;
private static final String latestIdKey = "order:latestId";
private static final int count = 10;
public FailureMessageCheck(RedisTemplate<String, Object> redisTemplate, ObjectMapper objectMapper, RestTemplate restTemplate) {
this.redisTemplate = redisTemplate;
this.objectMapper = objectMapper;
this.restTemplate = restTemplate;
}
@Scheduled(fixedDelay = 30 * 1000)
public void getFailureMessages(a) {
Long latestId;
Object value = redisTemplate.opsForValue().get(latestIdKey);
if (value == null) {
latestId = 0L;
} else {
latestId = Long.valueOf(String.valueOf(value));
}
DataResponse<Object> dataResponse;
String url = "http://localhost:8081/failure/messages? page=0&count=10&latestId=" + latestId;
dataResponse = restTemplate.getForObject(url, DataResponse.class);
// If code is 0, the request is abnormal or there is no data in data. No further action is required
if(dataResponse.getCode() ! =0) {
return;
}
PageParameter<Object> messages = objectMapper.convertValue(dataResponse.getData(), PageParameter.class);
System.out.println("The number of consumption failure messages in this round is: " + messages.getTotal());
int totalPage = messages.getTotalPage();
// Process the current page
List<Object> items = messages.getItems();
for (Object obj : items) {
Message message = objectMapper.convertValue(obj, Message.class);
businessHandle(message);
latestId = message.getId();
}
// If the number of pages is greater than 1, the loop requests the interface to fetch subsequent data for processing
if (totalPage > 1) {
for (int page = 2; page < totalPage; page++) {
url = "http://localhost:8081/failure/messages? page=" + page + "&count=10&latestId=" + latestId;
dataResponse = restTemplate.getForObject(url, DataResponse.class);
messages = objectMapper.convertValue(dataResponse.getData(), PageParameter.class);
items = messages.getItems();
for(Object obj : items) { Message message = objectMapper.convertValue(obj, Message.class); businessHandle(message); latestId = message.getId(); }}}/ / update the latestId
redisTemplate.opsForValue().set(latestIdKey, latestId);
}
@Async("orderThreadPool")
public void businessHandle(Message message) {
System.out.println("Take care of business"); }}Copy the code
Callback – Service Service
ClientConfig Configures the RestTemplate, which is used to initiate HTTP requests.
@Configuration
public class ClientConfig {
@Bean
public RestTemplate restTemplate(a) {
return newRestTemplate(); }}Copy the code
RabbitConfig Specifies the RabbitMQ configuration, mainly the type of message converter.
@Configuration
public class RabbitConfig {
/** * Message converter for RabbitMQ **@returnMessage converter */
@Bean
public MessageConverter messageConverter(a) {
return newJackson2JsonMessageConverter(); }}Copy the code
RedisConfig Some configuration of Redis, such as Key and Value serializer, etc.
@Configuration
public class RedisConfig {
private final RedisConnectionFactory redisConnectionFactory;
public RedisConfig(RedisConnectionFactory redisConnectionFactory) {
this.redisConnectionFactory = redisConnectionFactory;
}
@Bean
public RedisTemplate<String, Object> redisTemplate(a) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(valueSerializer());
return redisTemplate;
}
@Bean
public GenericJackson2JsonRedisSerializer valueSerializer(a) {
return newGenericJackson2JsonRedisSerializer(); }}Copy the code
The consumer of MessageConsumer messages performs business processing on the delayed messages. When receiving the delayed messages, it first determines whether the messages have been consumed. If the messages have been consumed (there is a confirmation result of consumption in Redis), the messages are stored in the database and then directly ACK.
@Component
public class MessageConsumer {
private final RedisTemplate<String, Object> redisTemplate;
private final RestTemplate restTemplate;
private final MessageRepository messageRepository;
public MessageConsumer(RedisTemplate<String, Object> redisTemplate, RestTemplate restTemplate, MessageRepository messageRepository) {
this.redisTemplate = redisTemplate;
this.restTemplate = restTemplate;
this.messageRepository = messageRepository;
}
@RabbitListener(queues = "ORDER_DELAY_QUEUE")
public void onMessage(@Headers Map<String, Object> headers, @Payload Order order, Channel channel) throws Exception {
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
Check whether the current message has been consumed successfully. If the message has been consumed successfully, the Producer directly ACK the message. Otherwise, the Producer uses RPC to notify the Producer to redeliver the message
Object result = redisTemplate.opsForValue().get(String.valueOf(order.getId()));
if (result == null) {
// Determine the current number of deliveries
Integer messageCount = (Integer) headers.get("messageCount");
if (messageCount == 2) {
// Consume failed message into the database
System.out.println("= = = = = = warehousing = = = = = =");
Message message = new Message();
message.setMessageId(order.getId());
message.setContent(order);
messageRepository.save(message);
channel.basicAck(deliveryTag, false);
return;
}
System.out.println("Time:" + System.currentTimeMillis() + ", the first" + messageCount + "This delayed delivery received, redelivery underway."); # here instead of using HTTP RPC restTemplate. PostForObject ("http://localhost:8080/place/order? count=" + (messageCount + 1), order, String.class);
channel.basicAck(deliveryTag, false);
return;
}
channel.basicAck(deliveryTag, false);
System.out.println("====== message has been consumed ======"); }}Copy the code
The MessageRepository ORM uses JPA, and the MessageRepository is mainly used to do some database operations on the Message table.
public interface MessageRepository extends JpaRepository<Message.Long>, JpaSpecificationExecutor<Message> {}Copy the code
Message business model for messages, corresponding to the Message table in the database, used to hold messages.
@Entity
@Data
@TypeDef(name = "json", typeClass = JsonStringType.class)
public class Message implements Serializable {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long messageId;
@Type(type = "json")
@Column(columnDefinition = "json")
private Order content;
}
Copy the code
ConfirmMessage confirms the business model for the message, and after the message consumption is complete, the ConfirmMessage can be assembled for delivery to inform the callback service that the message has been consumed. (This model is not used in this code, but Redis is used to record the message consumption results.)
@Data
public class ConfirmMessage {
private Long messageId;
private boolean status;
}
Copy the code
The Order business model, corresponding to the Order table in the database, is used to hold the Order data.
@Entity
@Data
@Table(name = "`order`")
public class Order implements Serializable {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Integer productNumber;
private BigDecimal totalPrice;
private Long timestamp;
}
Copy the code
FailureMessageController Interface for obtaining consumption failure messages, providing paging query.
@RestController
public class FailureMessageController {
private final FailureMessageService failureMessageService;
public FailureMessageController(FailureMessageService failureMessageService) {
this.failureMessageService = failureMessageService;
}
@GetMapping("/failure/messages")
public DataResponse<PageParameter<Message>> getFailureMessages(@RequestParam Integer page, @RequestParam Integer count, @RequestParam Long latestId) {
PageParameter<Message> messages = failureMessageService.getServiceExternalFailureMessage(page, count, latestId);
if (messages.getTotal() == 0) {
return new DataResponse<>(1."No Failure Message", messages);
}
return new DataResponse<>(0."Success", messages); }}Copy the code
FailureMessageService Obtains the specific implementation of the message of consumption failure.
@Service
public class FailureMessageService {
private final MessageRepository messageRepository;
public FailureMessageService(MessageRepository messageRepository) {
this.messageRepository = messageRepository;
}
/** * Query data whose ID is greater than latestId (external service failure messages) **@paramPage Page number, starting from 0 *@paramCount has no number *@paramThe latestId of latestId *@returnPaging data */
public PageParameter<Message> getServiceExternalFailureMessage(Integer page, Integer count, Long latestId) {
PageRequest pageRequest = PageRequest.of(page, count);
Page<Message> messages;
// If latestId is 0, all data will be queried
if (latestId == 0) {
messages = messageRepository.findAll((Specification<Message>) (root, criteriaQuery, criteriaBuilder) -> null, pageRequest);
} else {
messages = messageRepository.findAll((Specification<Message>) (root, criteriaQuery, criteriaBuilder) -> {
Path<Long> idPath = root.get("id");
List<Predicate> predicates = new ArrayList<>();
predicates.add(criteriaBuilder.gt(idPath, latestId));
Predicate[] predicateArr = new Predicate[predicates.size()];
criteriaQuery.where(predicates.toArray(predicateArr));
return null;
}, pageRequest);
}
return newPageParameter<>(messages); }}Copy the code