This is the first article on integrating message queues with SpringBoot, and we’ll take a closer look at message queues.
Introduction to Message Queues
1. What is message queue
MQ(Message Quene) : Through the typical producer and consumer model, producers continuously produce messages to the Message queue, and consumers continuously retrieve messages from the queue. Because both producers and consumers are asynchronous, and producers only care about sending messages and consumers only care about receiving messages, business decoupling is easily realized without intrusion of business logic.
2. What is the use of message queues
- Asynchronous processing
Scenario Description: A shopping mall has the registration function. When registering, you need to send an SMS verification code.
Traditionally, the user submits information to the user service. The user service invokes the SMS service to send SMS messages and then returns a response to the user. This is synchronous processing and takes a long time. After joining the message queue, the user directly submits the information to the user service, writes the information to the message queue, and directly returns the response to the user. The SMS service reads the message from the message queue and sends short messages.
- The application of decoupling
Scenario Description: Placing an order in a shopping mall.
The traditional way is that the user orders, the order system to query the inventory system, if the inventory system down, the order failure, loss of order volume. After joining the message queue, the user places an order, the order system records the order, writes the order information into the message queue, places the order successfully, and then the inventory system recovers to operate the database inventory (regardless of the case of inventory 0). In this way, the order system and the inventory system are loosely coupled
- Traffic peak clipping
Scenario description: Seckill activity.
If the traffic is too large, the response times out or the system breaks down. The user is added to the message queue, and the request is written to the message queue. After the maximum length of the message queue is reached, the user returns the message kill failure message and consumes the data in the message queue.
Introduction of the RabbitMQ
RabbitMQ is a message middleware that implements the Advanced Message Queuing Protocol (AMQP) and is written in Erlang.
1. AMQP protocol concept
AMQP: AMQP is a linking protocol that directly defines the format of data exchanged over the network, making providers that implement AMQP themselves cross-platform. Here is the AMQP protocol model:
- Server – Also known as broker, receives links from clients and implements AMQP entity services.
- Connection – A network Connection between an application and a broker.
- Channel – Network channel. Almost all operations are carried out in a channel and data flow is carried out in a channel. A channel is a channel for reading and writing messages. A client can establish multiple channels, each representing a session task.
- Message – a message, data sent between the server and the application. Consists of properties and Body. Properties allows messages to be decorated with advanced features such as message escalation, latencies, and so on. The body is the content of the message body.
- Virtual host – Virtual host, used for logical isolation, top-level message routing, and multiple switches within a single virtual address. Exchange and message queue Message Quene.
- Exchange – A switch that receives messages and forwards them to a bound queue according to the router.
- Binding – A virtual link between a switch and a queue. The binding can contain routing keys.
- Routing Key – A routing rule that the VM can use to determine how jiekyi routes a particular message.
- Quene – Message queue that saves messages and forwards them to consumers.
2. The message model for RabbitMQ
1. Simple model
In the picture above:
- P: generator
- C: Consumers
- Red: Quene, message queue
2. Working model
In the picture above:
- P: generator
- C1, C2: consumers
- Red: Quene, message queue
When message processing is time consuming, messages are produced much faster than they are consumed, and messages pile up and cannot be processed in a timely manner. In this case, multiple consumers can be bound to a queue to consume messages, which will be lost once consumed, so the task will not be repeated.
3. Broadcast Model (Fanout)
In this model, messages sent by producers can be consumed by all consumers.
In the picture above:
- P: generator
- X: switch
- C1, C2: consumers
- Red: Quene, message queue
4. Routing Model
In this model, different types of messages can be consumed by different consumers.
In the picture above:
- P: generator
- X: the switch receives the message from the producer and sends the message to the queue with the matching routing key
- C1, C2: consumers
- Red: Quene, message queue
5. Subscription Model (Topic)
This model is similar to the Direct model in that messages can be routed to different queues based on routing keys, but this model allows queues to bind routing keys using wildcards. This type of routing key consists of one or more words, and is used between multiple words. Segmentation.
Wildcard characters:
*
: Matches only one word
#
: Matches one or more words
6. The RPC model
This mode requires notifying the remote machine to run the function and waiting for the result to return. The process is blocked.
When the client starts, it creates an anonymous exclusive callback queue. And provide a function named call, which will send an RPC request and block until the result of the RPC operation is received.
Spring Boot integrates RabbitMQ
Step 1: Introduce POM dependencies
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code
Step 2: Add RabbitMQ service configuration information
spring:
rabbitmq:
virtual-host: javatrip
port: 5672
host: 127.0. 01.
username: guest
password: guest
Copy the code
Here we use the broadcast model as an example. The broadcast model (Fanout) is easier to understand, just like the public account. After I tweet the article every day, I will push it to every user who follows, and they can see this message.
Points to note in broadcast model:
- You can have multiple queues
- Each queue needs to be bound to a switch
- Each consumer has its own queue
- The switch sends messages to all queues that are bound
1. Define two queues
@Configuration
public class RabbitConfig {
final static String queueNameA = "first-queue";
final static String queueNameB = "second-queue";
/*** * Defines a queue and sets the queue property *@return* /
@Bean("queueA")
public Queue queueA(a){
Map<String,Object> map = new HashMap<>();
// Message expiration duration, 10 seconds
map.put("x-message-ttl".10000);
The maximum number of messages in a queue is 10
map.put("x-max-length".10);
// The first argument, the queue name
// Second parameter, durable: Persistent
// The third argument is "exclusive".
// The fourth parameter, autoDelete: automatic deletion
Queue queue = new Queue(queueNameA,true.false.false,map);
return queue;
}
@Bean("queueB")
public Queue queueB(a){
Map<String,Object> map = new HashMap<>();
// Message expiration duration, 10 seconds
map.put("x-message-ttl".10000);
The maximum number of messages in a queue is 10
map.put("x-max-length".10);
// The first argument, the queue name
// Second parameter, durable: Persistent
// The third argument is "exclusive".
// The fourth parameter, autoDelete: automatic deletion
Queue queue = new Queue(queueNameB,true.false.false,map);
returnqueue; }}Copy the code
2. Define sector switches
@Bean
public FanoutExchange fanoutExchange(a){
// The first parameter, the switch name
// The second parameter, durable, whether to persist
// The third parameter, autoDelete, is automatically deleted
FanoutExchange fanoutExchange = new FanoutExchange(exchangeName,true.false);
return fanoutExchange;
}
Copy the code
3. Bind the switch to the queue
@Bean
public Binding bindingA(@Qualifier("queueA") Queue queueA, FanoutExchange fanoutExchange){
Binding binding = BindingBuilder.bind(queueA).to(fanoutExchange);
return binding;
}
@Bean
public Binding bindingB(@Qualifier("queueB") Queue queueB,FanoutExchange fanoutExchange){
Binding binding = BindingBuilder.bind(queueB).to(fanoutExchange);
return binding;
}
Copy the code
4. Create two consumers to listen on two queues
@RabbitListener(queues = RabbitConfig.queueNameA)
@Component
@Slf4j
public class ConsumerA {
@RabbitHandler
public void receive(String message){
log.info("Message received by Consumer A:"+message); }}Copy the code
@RabbitListener(queues = RabbitConfig.queueNameB)
@Component
@Slf4j
public class ConsumerB {
@RabbitHandler
public void receive(String message){
log.info("Message received by Consumer B:"+message); }}Copy the code
5. Create producer production messages
@RestController
public class provider {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("send")
public void sendMessage(a){
String message = "Hello, THIS is Java Journey.";
rabbitTemplate.convertAndSend(RabbitConfig.exchangeName,null,message); }}Copy the code
After a producer sends a message, two consumers can consume the message at the same time.
The sample code for this article has been uploaded togithub, point astar
Support!
Spring Boot series tutorial directory
Spring-boot-route (I) Several ways for Controller to receive parameters
Spring-boot-route (2) Several methods of reading configuration files
Spring-boot-route (3) Upload multiple files
Spring-boot-route (4) Global exception processing
Spring-boot-route (5) Integrate Swagger to generate interface documents
Spring-boot-route (6) Integrate JApiDocs to generate interface documents
Spring-boot-route (7) Integrate jdbcTemplate operation database
Spring-boot-route (8) Integrating mybatis operation database
Spring-boot-route (9) Integrate JPA operation database
Spring-boot-route (10) Switching between multiple data sources
Spring-boot-route (11) Encrypting database configuration information
Spring-boot-route (12) Integrate REDis as cache
Spring-boot-route RabbitMQ
Spring-boot-route Kafka
Spring-boot-route (15) Integrate RocketMQ
Spring-boot-route (16) Use logback to produce log files
Spring-boot-route (17) Use AOP to log operations
Spring-boot-route (18) Spring-boot-adtuator monitoring applications
Spring-boot-route (19) Spring-boot-admin Monitoring service
Spring-boot-route (20) Spring Task Implements simple scheduled tasks
Spring-boot-route (21) Quartz Implements dynamic scheduled tasks
Spring-boot-route (22) Enables email sending
Spring-boot-route (23) Developed wechat official accounts
Spring-boot-route (24) Distributed session consistency processing
Spring-boot-route (25) two lines of code to achieve internationalization
Spring-boot-route (26) Integrate webSocket
This series of articles are frequently used in the work of knowledge, after learning this series, to cope with daily development more than enough. If you want to know more, just scan the qr code below and let me know. I will further improve this series of articles!