preface
1. SpringBoot integration configuration details
-
Publisher – confirms, the realization of a monitor is used to monitor the Broker client give us the confirmation request return: RabbitTemplate. ConfirmCallback
-
Publisher – returns to ensure that the message to the Broker end is up to, if an unreachable routing key, use the listener to unreachable message for subsequent processing, to ensure that the message routing success: RabbitTemplate. ReturnCallback
Other properties can be configured on the production side, such as send retry, timeout, number of times, and interval
2. Code demonstration
2.1 the production end
2.1.1 Creating a Springboot-producer project
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cp</groupId>
<artifactId>springboot-producer</artifactId>
<version>0.0.1 - the SNAPSHOT</version>
<packaging>jar</packaging>
<name>springboot-producer</name>
<description>springboot-producer</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4. RELEASE</version>
<relativePath/> <! -- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Copy the code
Rabbitsender. Java message producer
@Component
public class RabbitSender {
// Automatically inject the RabbitTemplate template class
@Autowired
private RabbitTemplate rabbitTemplate;
// The callback function: confirm
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: " + correlationData);
System.err.println("ack: " + ack);
if(! ack){// Can log, exception handling, compensation processing, etc
System.err.println("Exception Handling....");
}else {
// Update database, reliability delivery mechanism}}};// Callback function: return returns
final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.err.println("return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: "+ replyText); }};// Send Message method call: Build Message Message
public void send(Object message, Map<String, Object> properties) throws Exception {
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + timestamp globally unique for ACK to ensure a unique message, here is a test write dead. However, when doing the compensation strategy, you must ensure that this message is globally unique
CorrelationData correlationData = new CorrelationData("1234567890");
rabbitTemplate.convertAndSend("exchange-1"."springboot.abc", msg, correlationData); }}Copy the code
application.properties
spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/vhost_cp
spring.rabbitmq.connection-timeout=15000
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
Copy the code
2.1.2 Operate the control console
Add Exchange
Add the Queue
Exchange binding Queue
Change the routingKey and change springBoot to Spring to enter the returnCallback method
That’s when we realized the error
CorrelationData: correlationData [id=1234567890] ACK: false Exception handling....Copy the code
2.1.3 Solve the ack is false problem
This is due to the fact that we are testing in the test method, and when the test method ends, the rabbitMq-related resource is closed, so even though our message is sent, the asynchronous ConfirmCallback has this problem because the resource is closed. Add thread.sleep () to solve the problem.
@Test
public void testSender1(a) throws Exception {
Map<String, Object> properties = new HashMap<>();
properties.put("number"."12345");
properties.put("send_time", simpleDateFormat.format(new Date()));
rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
Thread.sleep(2000);
}
Copy the code
2.2 the consumer end
Core configuration of the consumer side:
# # to sign for model – manual to sign for the spring. The rabbitmq. Listener. Simple. Acknowledge – mode = manual # # set up to monitor restrictions: Maximum 10, the default 5 spring. The rabbitmq. Listener. Simple. Concurrency = 5 spring. The rabbitmq. Listener. Simple. Max – concurrency = 10
-
First, configure the manual confirmation mode for manual ACK processing, so that we can ensure the reliable delivery of messages, or re-queue (not recommended), log according to the service when the consumption fails.
-
You can set the number and maximum number of listeners on the consumer end to monitor the concurrency on the consumer end
Use the @rabbitListener annotation
- The consumer listens for the @rabbitListener annotation, which is useful in real life
- RabbitListener is a composite annotation that can be used to annotate configurations
- @queueBinding, @queue, @Exchange directly use the combined annotation to handle the consumer switch, Queue, binding, routing, and configure the listening function.
For example, if the @RabbitListener annotation is added to the onMessage method, another @rabbithandler annotation is required, and the code is listened to by the consumer.
Set the binding, put a queue on Value, set Exchange, whether to persist, set the Exchange type, set expression to true and route key. In this simple way, the previously complex code logic can be accomplished. You are advised to add the configuration to the configuration file and dynamically obtain the configuration. If mq does not have queues, exchanges, etc., annotation declarations can also create them, so you can test them yourself!
2.2.1 Creating a project Springboot-Consumer
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cp</groupId>
<artifactId>springboot-consumer</artifactId>
<version>0.0.1 - the SNAPSHOT</version>
<packaging>jar</packaging>
<name>springboot-consumer</name>
<description>springboot-consumer</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2. RELEASE</version>
<relativePath/> <! -- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Copy the code
RabbitReceiver. Java message producer
@Component
public class RabbitReceiver {
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "queue-1", durable="true"), exchange = @Exchange(value = "exchange-1", durable="true", type= "topic", ignoreDeclarationExceptions = "true"), key = "springboot.*" ) )
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
System.err.println("Payload:" + message.getPayload());
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
// Retrieve deliveryTag
channel.basicAck(deliveryTag, false); }}Copy the code
application.properties
spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.username=user_cp
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/vhost_cp
spring.rabbitmq.connection-timeout=15000
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
Copy the code
Run Application to see if the message sent at the production end can be consumed.
Print the result Before, there were many messages when I tested, so there were so many when I consumed.
3. Optimize code
- Custom Java object messages
- Change the configuration in the @rabbitListener annotation to dynamic configuration
@payload: Specifies the Body of a message. @headers: Fetch Headers.
3.1 Optimization of the consumer side
1. Define an Order object
public class Order implements Serializable {
private String id;
private String name;
public Order(a) {}public Order(String id, String name) {
super(a);this.id = id;
this.name = name;
}
public String getId(a) {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName(a) {
return name;
}
public void setName(String name) {
this.name = name; }}Copy the code
Note: When we transfer objects, we must serialize them. Otherwise, the transmission fails.
2. Add listening to RabbitReceiver
/**
*
* spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*
* @param order
* @param channel
* @param headers
* @throws Exception
*/
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", durable="${spring.rabbitmq.listener.order.queue.durable}"), exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", durable="${spring.rabbitmq.listener.order.exchange.durable}", type= "${spring.rabbitmq.listener.order.exchange.type}", ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"), key = "${spring.rabbitmq.listener.order.key}" ) )
@RabbitHandler
public void onOrderMessage(@Payload com.cp.springboot.entity.Order order,
Channel channel,
@Headers Map<String, Object> headers) throws Exception {
System.err.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
System.err.println("Order:" + order.getId());
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
/ / manual ACK
channel.basicAck(deliveryTag, false);
}
Copy the code
The configuration has been written to application.properties for dynamic fetching. It can also be put into the configuration center like our company. For example: Ctrip open source configuration center Apollo
3, the application properties
spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*
Copy the code
3.2 Optimization at the production end
1, the same is an Order object, must be consistent with the consumer end.
RabbitSender adds the send message
// Send message method call: Builds custom object messages
public void sendOrder(Order order) throws Exception {
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + timestamp globally unique
CorrelationData correlationData = new CorrelationData("0987654321");
rabbitTemplate.convertAndSend("exchange-2"."springboot.def", order, correlationData);
}
Copy the code
3. Add test methods
@Test
public void testSender2(a) throws Exception {
Order order = new Order("001"."First order");
rabbitSender.sendOrder(order);
ConfirmCallback asynchronous callback fails to prevent resource from closing prematurely
Thread.sleep(2000);
}
Copy the code
3.3 test
Run the testSender2() method.
The production end prints messages
The consumer prints the message
The RabbitMQ integration with SpringBoot is now complete, and in practice the usage scenario is similar.
At the end of the article
Welcome to pay attention to personal wechat official number: Coder programming
Articles included in
Github: Github.com/CoderMerlin… Gitee: Gitee.com/573059382/c…welcomeFocus onAnd star ~
Reference article:
RabbitMQ Message Middleware Introduction
Recommended articles:
Messaging middleware – RabbitMQ (7) Advanced features are all here! (on)
Messaging middleware – RabbitMQ (8) Advanced features all here! (below)
RabbitMQ (9) RabbitMQ integration with Spring AMQP! (all)