Spring Boot integrates RabbitMQ

[TOC]

Written in the beginning

Recently I have been working on a springboot-based project using SSM +mysql+ RabbitMQ + Redis. With the exception of RabbitMQ, the others were quickly integrated, but RabbitMQ took a lot of digging and finally integrated as expected. The process is hereby recorded for reference.

Integration process

The code in the integration process is the key configuration of the integration and its use. For basic SpringBoot configuration, see SpringBoot Quick Start.

The configuration file

  • pom.xml
<! -- rabbit-mq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>Copy the code
  • application.yml
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / listener: simple: Acknowledge -mode: manual # concurrency: 5 # maximum number of concurrent clients max-concurrency: 10 # Maximum number of concurrent clients prefetch: Cache: channel: size: 50 # number of channels in cache ## custom configuration mq: defaultExchange: amqpExchange # default switch queue: Queue # queue name routeKey: queue_key # routeKeyCopy the code
  • MQProperties
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @Component @ConfigurationProperties(prefix = "mq") public class MQProperties { private String defaultExchange; private String routeKey; private String queue; public String getDefaultExchange() { return defaultExchange; } public void setDefaultExchange(String defaultExchange) { this.defaultExchange = defaultExchange; } public String getRouteKey() { return routeKey; } public void setRouteKey(String routeKey) { this.routeKey = routeKey; } public String getQueue() { return queue; } public void setQueue(String queue) { this.queue = queue; }}Copy the code

The RabbitMQ configuration

import com.switchvov.rabbitmq.constant.MQProperties; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @EnableRabbit public class RabbitMQConfig { @Autowired private MQProperties mqProperties; @Bean public Queue queue() { boolean durable = true; boolean exclusive = false; boolean autoDelete = false; return new Queue(mqProperties.getQueue(), durable, exclusive, autoDelete); } @Bean public DirectExchange defaultExchange() { boolean durable = true; boolean autoDelete = false; return new DirectExchange(mqProperties.getDefaultExchange(), durable, autoDelete); } @Bean public Binding binding() { return BindingBuilder.bind(queue()) .to(defaultExchange()) .with(mqProperties.getRouteKey()); }}Copy the code

The RabbitMQ producers

import com.switchvov.rabbitmq.constant.MQProperties; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitMQTest { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private MQProperties mqProperties; @Test public void testSendMessage() { rabbitTemplate.convertAndSend(mqProperties.getDefaultExchange(), Mqproperties.getroutekey (), "sent a message "); }}Copy the code

The RabbitMQ consumers

import com.switchvov.rabbitmq.common.RabbitMQUtils; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Service; @Service public class RabbitMQService { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQService.class); @RabbitListener(queues = "${mq.queue}") public void receive(String payload, Channel channel, @header (AmqpHeaders.DELIVERY_TAG) long tag) {logger. info(" Payloads: {}", payload); RabbitMQUtils.askMessage(channel, tag, LOGGER); }}Copy the code

Manual answer simple utility class

import com.rabbitmq.client.Channel; import org.slf4j.Logger; import java.io.IOException; public final class RabbitMQUtils { public static void askMessage(Channel channel, long tag, final Logger logger) { askMessage(channel, tag, logger, false); } public static void askMessage(Channel channel, long tag, final Logger logger, boolean multiple) { try { channel.basicAck(tag, multiple); } catch (IOException e) {logger.error("RabbitMQ, IO exception: {}", LLDB message ()); } } public static void rejectMessage(Channel channel, long tag, final Logger logger) { rejectMessage(channel, tag, logger, false, false); } public static void rejectAndBackMQ(Channel channel, long tag, final Logger logger) { rejectMessage(channel, tag, logger, false, true); } public static void rejectMessage(Channel channel, long tag, final Logger logger, boolean multiple, boolean request) { try { channel.basicNack(tag, multiple, request); } catch (IOException e) {logger.error("RabbitMQ, IO exception: {}", LLDB message ()); }}}Copy the code

Reference documentation

  1. RabbitMQ message queue (1) : Detailed Introduction
    • A brief introduction to RabbitMQ architecture and terminology
  2. Spring Boot uses RabbitMQ
    • Simple Spring Boot and RabbitMQ integration
  3. Spring Boot integrates RabbitMQ
    • Figure out the custom configurationqueue,exchange,binding
  4. Spring RabbitMQ – using manual channel acknowledgement on a service with @RabbitListener configuration
    • The recipient section basically follows the above answers
  5. Spring Boot/Cloud (19) Concurrent consumption messages, how to ensure that the data into the library is up to date?
    • Concurrent consumption configuration

Share and record what you learn and see