This is the first article on integrating message queues with SpringBoot, and we’ll take a closer look at message queues.

Introduction to Message Queues

1. What is message queue

MQ(Message Quene) : Through the typical producer and consumer model, producers continuously produce messages to the Message queue, and consumers continuously retrieve messages from the queue. Because both producers and consumers are asynchronous, and producers only care about sending messages and consumers only care about receiving messages, business decoupling is easily realized without intrusion of business logic.

2. What is the use of message queues

  • Asynchronous processing

Scenario Description: A shopping mall has the registration function. When registering, you need to send an SMS verification code.

Traditionally, the user submits information to the user service. The user service invokes the SMS service to send SMS messages and then returns a response to the user. This is synchronous processing and takes a long time. After joining the message queue, the user directly submits the information to the user service, writes the information to the message queue, and directly returns the response to the user. The SMS service reads the message from the message queue and sends short messages.

  • The application of decoupling

Scenario Description: Placing an order in a shopping mall.

The traditional way is that the user orders, the order system to query the inventory system, if the inventory system down, the order failure, loss of order volume. After joining the message queue, the user places an order, the order system records the order, writes the order information into the message queue, places the order successfully, and then the inventory system recovers to operate the database inventory (regardless of the case of inventory 0). In this way, the order system and the inventory system are loosely coupled

  • Traffic peak clipping

Scenario description: Seckill activity.

If the traffic is too large, the response times out or the system breaks down. The user is added to the message queue, and the request is written to the message queue. After the maximum length of the message queue is reached, the user returns the message kill failure message and consumes the data in the message queue.

Introduction of the RabbitMQ

RabbitMQ is a message middleware that implements the Advanced Message Queuing Protocol (AMQP) and is written in Erlang.

1. AMQP protocol concept

AMQP: AMQP is a linking protocol that directly defines the format of data exchanged over the network, making providers that implement AMQP themselves cross-platform. Here is the AMQP protocol model:

  • Server – Also known as broker, receives links from clients and implements AMQP entity services.
  • Connection – A network Connection between an application and a broker.
  • Channel – Network channel. Almost all operations are carried out in a channel and data flow is carried out in a channel. A channel is a channel for reading and writing messages. A client can establish multiple channels, each representing a session task.
  • Message – a message, data sent between the server and the application. Consists of properties and Body. Properties allows messages to be decorated with advanced features such as message escalation, latencies, and so on. The body is the content of the message body.
  • Virtual host – Virtual host, used for logical isolation, top-level message routing, and multiple switches within a single virtual address. Exchange and message queue Message Quene.
  • Exchange – A switch that receives messages and forwards them to a bound queue according to the router.
  • Binding – A virtual link between a switch and a queue. The binding can contain routing keys.
  • Routing Key – A routing rule that the VM can use to determine how jiekyi routes a particular message.
  • Quene – Message queue that saves messages and forwards them to consumers.

2. The message model for RabbitMQ

1. Simple model

In the picture above:

  • P: generator
  • C: Consumers
  • Red: Quene, message queue

2. Working model

In the picture above:

  • P: generator
  • C1, C2: consumers
  • Red: Quene, message queue

When message processing is time consuming, messages are produced much faster than they are consumed, and messages pile up and cannot be processed in a timely manner. In this case, multiple consumers can be bound to a queue to consume messages, which will be lost once consumed, so the task will not be repeated.

3. Broadcast Model (Fanout)

In this model, messages sent by producers can be consumed by all consumers.

In the picture above:

  • P: generator
  • X: switch
  • C1, C2: consumers
  • Red: Quene, message queue

4. Routing Model

In this model, different types of messages can be consumed by different consumers.

In the picture above:

  • P: generator
  • X: the switch receives the message from the producer and sends the message to the queue with the matching routing key
  • C1, C2: consumers
  • Red: Quene, message queue

5. Subscription Model (Topic)

This model is similar to the Direct model in that messages can be routed to different queues based on routing keys, but this model allows queues to bind routing keys using wildcards. This type of routing key consists of one or more words, and is used between multiple words. Segmentation.

Wildcard characters:

*: Matches only one word

#: Matches one or more words

6. The RPC model

This mode requires notifying the remote machine to run the function and waiting for the result to return. The process is blocked.

When the client starts, it creates an anonymous exclusive callback queue. And provide a function named call, which will send an RPC request and block until the result of the RPC operation is received.

Spring Boot integrates RabbitMQ

Step 1: Introduce POM dependencies

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code

Step 2: Add RabbitMQ service configuration information

spring:
  rabbitmq:
    virtual-host: javatrip
    port: 5672
    host: 127.0. 01.
    username: guest
    password: guest
Copy the code

Here we use the broadcast model as an example. The broadcast model (Fanout) is easier to understand, just like the public account. After I tweet the article every day, I will push it to every user who follows, and they can see this message.

Points to note in broadcast model:

  1. You can have multiple queues
  2. Each queue needs to be bound to a switch
  3. Each consumer has its own queue
  4. The switch sends messages to all queues that are bound

1. Define two queues

@Configuration
public class RabbitConfig {

    final static String queueNameA = "first-queue";
    final static String queueNameB = "second-queue";

    /*** * Defines a queue and sets the queue property *@return* /
    @Bean("queueA")
    public Queue queueA(a){

        Map<String,Object> map = new HashMap<>();
        // Message expiration duration, 10 seconds
        map.put("x-message-ttl".10000);
        The maximum number of messages in a queue is 10
        map.put("x-max-length".10);
        // The first argument, the queue name
        // Second parameter, durable: Persistent
        // The third argument is "exclusive".
        // The fourth parameter, autoDelete: automatic deletion
        Queue queue = new Queue(queueNameA,true.false.false,map);
        return queue;
    }
    
    @Bean("queueB")
    public Queue queueB(a){

        Map<String,Object> map = new HashMap<>();
        // Message expiration duration, 10 seconds
        map.put("x-message-ttl".10000);
        The maximum number of messages in a queue is 10
        map.put("x-max-length".10);
        // The first argument, the queue name
        // Second parameter, durable: Persistent
        // The third argument is "exclusive".
        // The fourth parameter, autoDelete: automatic deletion
        Queue queue = new Queue(queueNameB,true.false.false,map);
        returnqueue; }}Copy the code

2. Define sector switches

@Bean
public FanoutExchange fanoutExchange(a){

    // The first parameter, the switch name
    // The second parameter, durable, whether to persist
    // The third parameter, autoDelete, is automatically deleted
    FanoutExchange fanoutExchange = new FanoutExchange(exchangeName,true.false);
    return fanoutExchange;
}
Copy the code

3. Bind the switch to the queue

@Bean
public Binding bindingA(@Qualifier("queueA") Queue queueA, FanoutExchange fanoutExchange){
    Binding binding = BindingBuilder.bind(queueA).to(fanoutExchange);
    return binding;
}

@Bean
public Binding bindingB(@Qualifier("queueB") Queue queueB,FanoutExchange fanoutExchange){
    Binding binding = BindingBuilder.bind(queueB).to(fanoutExchange);
    return binding;
}
Copy the code

4. Create two consumers to listen on two queues

@RabbitListener(queues = RabbitConfig.queueNameA)
@Component
@Slf4j
public class ConsumerA {

    @RabbitHandler
    public void receive(String message){
        log.info("Message received by Consumer A:"+message); }}Copy the code
@RabbitListener(queues = RabbitConfig.queueNameB)
@Component
@Slf4j
public class ConsumerB {

    @RabbitHandler
    public void receive(String message){
        log.info("Message received by Consumer B:"+message); }}Copy the code

5. Create producer production messages

@RestController
public class provider {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("send")
    public void sendMessage(a){

        String message = "Hello, THIS is Java Journey.";
        rabbitTemplate.convertAndSend(RabbitConfig.exchangeName,null,message); }}Copy the code

After a producer sends a message, two consumers can consume the message at the same time.


The sample code for this article has been uploaded togithub, point astarSupport!

Spring Boot series tutorial directory

Spring-boot-route (I) Several ways for Controller to receive parameters

Spring-boot-route (2) Several methods of reading configuration files

Spring-boot-route (3) Upload multiple files

Spring-boot-route (4) Global exception processing

Spring-boot-route (5) Integrate Swagger to generate interface documents

Spring-boot-route (6) Integrate JApiDocs to generate interface documents

Spring-boot-route (7) Integrate jdbcTemplate operation database

Spring-boot-route (8) Integrating mybatis operation database

Spring-boot-route (9) Integrate JPA operation database

Spring-boot-route (10) Switching between multiple data sources

Spring-boot-route (11) Encrypting database configuration information

Spring-boot-route (12) Integrate REDis as cache

Spring-boot-route RabbitMQ

Spring-boot-route Kafka

Spring-boot-route (15) Integrate RocketMQ

Spring-boot-route (16) Use logback to produce log files

Spring-boot-route (17) Use AOP to log operations

Spring-boot-route (18) Spring-boot-adtuator monitoring applications

Spring-boot-route (19) Spring-boot-admin Monitoring service

Spring-boot-route (20) Spring Task Implements simple scheduled tasks

Spring-boot-route (21) Quartz Implements dynamic scheduled tasks

Spring-boot-route (22) Enables email sending

Spring-boot-route (23) Developed wechat official accounts

Spring-boot-route (24) Distributed session consistency processing

Spring-boot-route (25) two lines of code to achieve internationalization

Spring-boot-route (26) Integrate webSocket

This series of articles are frequently used in the work of knowledge, after learning this series, to cope with daily development more than enough. If you want to know more, just scan the qr code below and let me know. I will further improve this series of articles!