Springboot implements batch processing with rabbitMQ
In the MQ development environment, MQ data connection test passed. Recommended: After this function is enabled, if the service logic is inserted into the database and the database performs batch processing at the same time, the memory usage can be greatly improved.Copy the code
Step 1: Introduce dependencies
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code
Step 2: Edit the YML file
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: guest
password: guest
publisher-confirms: true
Copy the code
Step 3: Configure MQConfig Configuration: Based on routing mode
/** * rabbitMQ configuration class */
@Configuration
public class RabbitConfig {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Bean
DirectExchange orderDirect(a) {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange())
.durable(true)
.build();
}
/** * The switch to which the queue is bound */
@Bean
DirectExchange orderTtlDirect(a) {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange())
.durable(true)
.build();
}
/** * Actual consumption queue */
@Bean
public Queue orderQueue(a) {
return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());
}
/** * Delay queue (dead letter queue) */
@Bean
public Queue orderTtlQueue(a) {
return QueueBuilder
.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName())
.withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())// Forward the switch after expiration
.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())// Route key to forward after expiration
.build();
}
/** * bind queues to switches */
@Bean
Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){
return BindingBuilder
.bind(orderQueue)
.to(orderDirect)
.with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
}
/** * bind delay queue to switch */
@Bean
Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){
return BindingBuilder
.bind(orderTtlQueue)
.to(orderTtlDirect)
.with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
}
@Bean("customContainerFactory")
public SimpleRabbitListenerContainerFactory customContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setBatchListener(true);// Enable batch listening
factory.setConsumerBatchEnabled(true);// Start batch processing
factory.setConcurrentConsumers(16); // Set the number of threads
factory.setMaxConcurrentConsumers(16); // Maximum number of threads
factory.setBatchSize(500);// The amount of data retrieved from the queue each time,
configurer.configure(factory, connectionFactory);
returnfactory; }}Copy the code
Step 4: Provider call:
AmqpTemplate. ConvertAndSend (parameters one, two, three parameters), a parameter: one for Exchange, routing channel parameters of the two: two queue, queue parameters 3: send to MQ business dataCopy the code
Step 5: Consumer call:
@Component
public class MornitorConsumer {
@RabbitListener(queues = "morinitor.center.cancel.ttl", containerFactory = "customContainerFactory") // Specify the name of the queue to listen on. Note that the containerFactory name is consistent with the configuration name
public void showMSG1(List<Map<Date, String>> list) {
// Message processing, logical operation.}}Copy the code