Zero. Preface to the article
- SpringBoot RabbitMQ advanced series has been updated to integrate RabbitMQ into SpringBoot for high availability and reliable delivery
- There are four core switches, dead letter queue, reliable delivery, abnormal consumption processing
- This three-part series focuses on integration and enterprise-level content. You will need to know the basics of SpringBoot and RabbitMQ
- The source code of the article is put on the web disk, there is no Git repository, the required self-download, script and other information in the common
- Personal level is limited, there are mistakes welcome to correct
Link: pan.baidu.com/s/1lpZC6fr8… Extraction code: QTVI
I. Environment construction
- Create three sub-modules using Maven multi-Module mode
- Common: Common entity information
- Rabbitmq-publisher: Message publisher, based on SpringBoot
- Rabbitmq-subscriber: message subscriber, based on SpringBoot
- Add rabbitMQ Maven dependencies to both the message publisher and subscriber projects
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code
- Add rabbitMQ configuration information to both projects
spring:
rabbitmq:
host: xxx.xxx.xxx.xxx
port: 5672
username: username
password: password
# Virtual host, need background configuration first
# virtual-host: springboot
Copy the code
- After the above three steps are complete, the rabbitMQ infrastructure is set up
- Rabbitmq configuration properties class
- org.springframework.boot.autoconfigure.amqp.RabbitProperties
Two, four switches
2.1 Direct – The switch is directly connected
2.1.1 Message Sender
- Create a new configuration class in the message publisher that declares the switch information
- Only the switch is declared; queues and switch bindings are subscriber operations
- Different types provide different switches
- If only the exchange is declared, the exchange is not created, but is created at binding time or when a message is sent
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AmqpPublisherConfig {
@Bean
public DirectExchange emailDirectExchange(a) {
// Declaration mode 1
// return new DirectExchange("exchange.direct.springboot.email");
// Declaration mode 2
return ExchangeBuilder.directExchange("exchange.direct.springboot.email").build(); }}Copy the code
- To send messages, use RabbitTemplate, the RabbitMQ message sender provided for SpringBoot
- org.springframework.amqp.rabbit.core.RabbitTemplate
- Sending a Message
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class PublishController {
@Resource
private RabbitTemplate rabbitTemplate;
@RequestMapping("/direct")
public Object direct(String message) {
try {
rabbitTemplate.convertAndSend("Switch"."Routing key", message);
return message;
} catch (AmqpException e) {
System.out.println(e.getMessage());
return "Network is down, please try again later ~"; }}}Copy the code
2.2.2 Message Receiver
- The recipient needs to set the following parameters
- Switch: Indicates the switch type corresponding to new
- Queue: Only the Queue type is identified by name
- Switch and queue binding: through bindingBuilder.bind (queue).to(switch).with(routing key);
- Only exchange and queue bindings are declared, and are not created immediately, but when messages are sent or queues are listened to
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AmqpSubscriberConfig {
/** * Directly connect to the switch */
@Bean
public DirectExchange emailDirectExchange(a) {
// Declaration mode 1
// return new DirectExchange("exchange.direct.springboot.email");
// Declaration mode 2
return ExchangeBuilder.directExchange("exchange.direct.springboot.email").build();
}
/** * declare queue */
@Bean
public Queue emailQueue(a) {
// Declaration mode 1
// return new Queue("queue.direct.springboot.email");
// Declaration mode 2
return QueueBuilder.durable("queue.direct.springboot.email").build();
}
/** * switch and queue binding */
@Bean
@Resource
public Binding emailBiding(Queue emailQueue, DirectExchange emailDirectExchange) {
// Bind routes to switches using routing keys
return BindingBuilder.bind(emailQueue).to(emailDirectExchange).with("springboot.email.routing.key"); }}Copy the code
- Listening to the queue
- The listening queue must exist or an error will be reported
- The message is automatically acknowledged when the queue consumption is completed
- If more than one queue listens to a queue at the same time, messages are processed by different methods in rotation
- You can specify the receive type in the parameter, and the message will be automatically converted to the corresponding type
- You can also specify the Message argument to get the corresponding Message information
- org.springframework.amqp.core.Message
- Get news attributes: message. GetMessageProperties ()
- Get the message content: message.getBody()
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/** * Message subscription listener */
@Component
public class SubscriberListener {
/** * direct listener, the same listener queue messages will be processed in turn */
@RabbitListener(queues = "queue.direct.springboot.email")
public void receiver01(String msg) {
System.out.println("receiver01 message = " + msg);
}
@RabbitListener(queues = "queue.direct.springboot.email")
public void receiver02(String msg) {
System.out.println("receiver02 message = "+ msg); }}Copy the code
2.1.3 Message publishing subscription
- Start the subscriber first and see the queue declaration
2. Start the publisher and publish the message
- http://127.0.0.1:8071/direct?message=direct
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class PublishController {
@Resource
private RabbitTemplate rabbitTemplate;
@RequestMapping("/direct")
public Object direct(String message) {
try {
// Specify the switch and routing key to send
rabbitTemplate.convertAndSend("exchange.direct.springboot.email"."springboot.email.routing.key", message);
return message;
} catch (AmqpException e) {
System.out.println(e.getMessage());
return "Network is down, please try again later ~"; }}}Copy the code
- Subscribers receive messages in turn
receiver01 message = direct
receiver02 message = direct
receiver01 message = direct
receiver02 message = direct
receiver01 message = direct
receiver02 message = direct
Copy the code
2.2 Topic – Topic switch
2.2.1 Message sender
- Declare the Topic switch
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BlogPublisherConfig {
@Bean
public Exchange blogTopicExchange(a) {
return ExchangeBuilder.topicExchange("exchange.topic.springboot.blog").build(); }}Copy the code
- Statement of the controller
@RequestMapping("/topic")
public Object topic(String routingKey, String message) {
rabbitTemplate.convertAndSend("exchange.topic.springboot.blog", routingKey, message);
return routingKey + ":" + message;
}
Copy the code
2.2.2 Message Receiver
- Declare the exchange, three queues, and the binding of queues
- * : matches a string
- Matches one or more strings
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class BlogSubscriberConfig {
/** * Topic switch */
@Bean
public TopicExchange blogTopicExchange(a) {
return ExchangeBuilder.topicExchange("exchange.topic.springboot.blog").build();
}
@Bean
public Queue blogJavaQueue(a) {
return QueueBuilder.durable("queue.topic.springboot.blog.java").build();
}
@Bean
public Queue blogMqQueue(a) {
return QueueBuilder.durable("queue.topic.springboot.blog.mq").build();
}
@Bean
public Queue blogAllQueue(a) {
return QueueBuilder.durable("queue.topic.springboot.blog.all").build();
}
@Bean
@Resource
public Binding blogJavaBinding(TopicExchange blogTopicExchange, Queue blogJavaQueue) {
return BindingBuilder.bind(blogJavaQueue).to(blogTopicExchange).with("springboot.blog.java.routing.key");
}
@Bean
@Resource
public Binding blogMqBinding(TopicExchange blogTopicExchange, Queue blogMqQueue) {
return BindingBuilder.bind(blogMqQueue).to(blogTopicExchange).with("springboot.blog.mq.routing.key");
}
@Bean
@Resource
public Binding blogAllBinding(TopicExchange blogTopicExchange, Queue blogAllQueue) {
// #: match one or more * : match one
return BindingBuilder.bind(blogAllQueue).to(blogTopicExchange).with("springboot.blog.#.routing.key"); }}Copy the code
- Listening to the queue
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class BlogService {
/** * topic monitor */
@RabbitListener(queues = "queue.topic.springboot.blog.java")
public void blogJavaListener(String message) {
System.out.println("blogJavaListener message = " + message);
}
@RabbitListener(queues = "queue.topic.springboot.blog.mq")
public void blogMqListener(String message) {
System.out.println("blogMqListener message = " + message);
}
@RabbitListener(queues = "queue.topic.springboot.blog.all")
public void blogAllaListener(String message) {
System.out.println("blogAllListener message = "+ message); }}Copy the code
2.2.3 Message publishing and subscription
- The publisher sends the message
- http://localhost:8071/topic?routingKey=springboot.blog.java.routing.key&message=hello
- http://localhost:8071/topic?routingKey=springboot.blog.mq.routing.key&message=hello
- Subscribers receive messages
- Full matching and fuzzy matching
- All matches and either one is going to be matched
blogJavaListener message = hello
blogAllListener message = hello
blogAllListener message = hello
blogMqListener message = hello
Copy the code
2.3 FANout – Broadcast switch
2.3.1 Message sender
- Declare the FANout switch
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class NoticePublisherConfig {
@Bean
public Exchange radioFanoutExchange(a) {
return ExchangeBuilder.fanoutExchange("exchange.fanout.springboot.radio").build(); }}Copy the code
- Statement of the controller
@RequestMapping("/fanout")
public Object fanout(String message) {
rabbitTemplate.convertAndSend("exchange.fanout.springboot.radio".null, message);
return message;
}
Copy the code
2.32 Message Receiver
- Create switches, routing keys, and bindings
- There is no need to use routing keys
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class NoticeSubscriberConfig {
@Bean
public FanoutExchange radioFanoutExchange(a) {
return ExchangeBuilder.fanoutExchange("exchange.fanout.springboot.radio").build();
}
@Bean
public Queue radioQueue(a) {
return QueueBuilder.durable("queue.fanout.springboot.radio").build();
}
@Bean
@Resource
public Binding radioBinding(FanoutExchange radioFanoutExchange, Queue radioQueue) {
// The broadcast switch binding has no routing key
returnBindingBuilder.bind(radioQueue).to(radioFanoutExchange); }}Copy the code
- Listening to the queue
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class NoticeService {
@RabbitListener(queues = "queue.fanout.springboot.radio")
public void radioListener(String message) {
System.out.println("radioListener message = "+ message); }}Copy the code
2.3.3 Message publishing and subscription
- The publisher sends the message
- http://localhost:8071/fanout?message=fanout
- Subscribers receive messages
radioListener message = fanout
Copy the code
2.4 Headers – Headers switch
2.4.1 Message sender
- The HEADERS mode ignores routing keys through header matching
- The sender needs to create the queue
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class HeadersPublisherConfig {
@Bean
public Exchange radioHeadersExchange(a) {
return ExchangeBuilder.headersExchange("exchange.headers.springboot.headers").build(); }}Copy the code
- Create controller to send message
- MessageProperties and Message packet is: org. Springframework. Closer. The core
- You need to create a MessageProperties object to set the header information
- Message is used to store messages and Message attribute information
@RequestMapping("/headers")
public Object headers(@RequestParam Map<String, String> param) {
MessageProperties properties = new MessageProperties();
properties.setHeader("name", param.get("name"));
properties.setHeader("token", param.get("token"));
Message mqMessage = new Message(param.get("message").getBytes(), properties);
rabbitTemplate.convertAndSend("exchange.headers.springboot.headers".null, mqMessage);
return properties;
}
Copy the code
2.4.2 Message Receiver
- The receiver needs to declare the exchange, queue, and binding as well as the previous three
- Different rules need to be used when queue binding
- BindingBuilder.bind(headersQueue01).to(headersExchange).whereAll(key).match()
- All field attributes and values match
- BindingBuilder.bind(headersQueue02).to(headersExchange).whereAny(key).match()
- Any field attributes and values match
- BindingBuilder.bind(headersQueue03).to(headersExchange).whereAll(“name”, “token”).exist()
- Specifies that all property fields exist
- BindingBuilder.bind(headersQueue03).to(headersExchange).whereAny(“name”, “token”).exist()
- Specifies that any property exists
- BindingBuilder.bind(headersQueue01).to(headersExchange).whereAll(key).match()
- The attributes stored in headerMap are the attributes encapsulated in the sender. If the attributes match perfectly, the route is correct
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class HeadersSubscriberConfig {
@Bean
public HeadersExchange headersExchange(a) {
return ExchangeBuilder.headersExchange("exchange.headers.springboot.headers").build();
}
@Bean
public Queue headersQueue01(a) {
return QueueBuilder.durable("queue.headers.springboot.01").build();
}
@Bean
public Queue headersQueue02(a) {
return QueueBuilder.durable("queue.headers.springboot.02").build();
}
@Bean
public Queue headersQueue03(a) {
return QueueBuilder.durable("queue.headers.springboot.03").build();
}
@Bean
@Resource
public Binding headers01Binding(HeadersExchange headersExchange,Queue headersQueue01) {
Map<String, Object> key = new HashMap<>(4);
key.put("name"."java");
key.put("token"."001");
return BindingBuilder.bind(headersQueue01).to(headersExchange).whereAll(key).match();
}
@Bean
@Resource
public Binding headers02Binding(HeadersExchange headersExchange,Queue headersQueue02) {
Map<String, Object> key = new HashMap<>(4);
key.put("name"."java");
key.put("token"."002");
return BindingBuilder.bind(headersQueue02).to(headersExchange).whereAny(key).match();
}
@Bean
@Resource
public Binding headers03Binding(HeadersExchange headersExchange,Queue headersQueue03) {
// Both name and token need to exist
return BindingBuilder.bind(headersQueue03).to(headersExchange).whereAll("name"."token").exist();
// Any name or token exists
// return BindingBuilder.bind(headersQueue03).to(headersExchange).whereAny("name", "token").exist();}}Copy the code
- The queue to monitor
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class HeadersService {
@RabbitListener(queues = "queue.headers.springboot.01")
public void headers01Listener(String message) {
System.out.println("headers01Listener message = " + message);
}
@RabbitListener(queues = "queue.headers.springboot.02")
public void headers02Listener(String message) {
System.out.println("headers02Listener message = " + message);
}
@RabbitListener(queues = "queue.headers.springboot.03")
public void headers03Listener(String message) {
System.out.println("headers03Listener message = "+ message); }}Copy the code
2.4.3 Message Publishing and Subscription
- Send a message
- http://localhost:8071/headers?name=java&token=001&message=headers
- http://localhost:8071/headers?name=java&token=002&message=headers
- http://localhost:8071/headers?name=mq&token=003&message=headers
- Receives the message
headers01Listener message = headers
headers02Listener message = headers
headers03Listener message = headers
headers02Listener message = headers
headers03Listener message = headers
headers03Listener message = headers
Copy the code