Springboot implements batch processing with rabbitMQ

In the MQ development environment, MQ data connection test passed. Recommended: After this function is enabled, if the service logic is inserted into the database and the database performs batch processing at the same time, the memory usage can be greatly improved.Copy the code

Step 1: Introduce dependencies

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code

Step 2: Edit the YML file

rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /
    username: guest
    password: guest
    publisher-confirms: true
Copy the code

Step 3: Configure MQConfig Configuration: Based on routing mode

/** * rabbitMQ configuration class */
@Configuration
public class RabbitConfig {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Bean
    DirectExchange orderDirect(a) {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange())
                .durable(true)
                .build();
    }

    /** * The switch to which the queue is bound */
    @Bean
    DirectExchange orderTtlDirect(a) {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange())
                .durable(true)
                .build();
    }

    /** * Actual consumption queue */
    @Bean
    public Queue orderQueue(a) {
        return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());
    }

    /** * Delay queue (dead letter queue) */
    @Bean
    public Queue orderTtlQueue(a) {
        return QueueBuilder
                .durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName())
                .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())// Forward the switch after expiration
                .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())// Route key to forward after expiration
                .build();
    }

    /** * bind queues to switches */
    @Bean
    Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){
        return BindingBuilder
                .bind(orderQueue)
                .to(orderDirect)
                .with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
    }

    /** * bind delay queue to switch */
    @Bean
    Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){
        return BindingBuilder
                .bind(orderTtlQueue)
                .to(orderTtlDirect)
                .with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
    }
    @Bean("customContainerFactory")
    public SimpleRabbitListenerContainerFactory customContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setBatchListener(true);// Enable batch listening
        factory.setConsumerBatchEnabled(true);// Start batch processing
        factory.setConcurrentConsumers(16);  // Set the number of threads
        factory.setMaxConcurrentConsumers(16); // Maximum number of threads
        factory.setBatchSize(500);// The amount of data retrieved from the queue each time,
        configurer.configure(factory, connectionFactory);
        returnfactory; }}Copy the code

Step 4: Provider call:

AmqpTemplate. ConvertAndSend (parameters one, two, three parameters), a parameter: one for Exchange, routing channel parameters of the two: two queue, queue parameters 3: send to MQ business dataCopy the code

Step 5: Consumer call:

@Component 
public class MornitorConsumer {
    @RabbitListener(queues = "morinitor.center.cancel.ttl", containerFactory = "customContainerFactory")   // Specify the name of the queue to listen on. Note that the containerFactory name is consistent with the configuration name
    public void showMSG1(List<Map<Date, String>> list) {
    	// Message processing, logical operation.}}Copy the code