Zero. Preface to the article

  1. SpringBoot RabbitMQ advanced series has been updated to integrate RabbitMQ into SpringBoot for high availability and reliable delivery
  2. There are four core switches, dead letter queue, reliable delivery, abnormal consumption processing
  3. This three-part series focuses on integration and enterprise-level content. You will need to know the basics of SpringBoot and RabbitMQ
  4. The source code of the article is put on the web disk, there is no Git repository, the required self-download, script and other information in the common
  5. Personal level is limited, there are mistakes welcome to correct

Link:… Extraction code: QTVI

I. Environment construction

  1. Create three sub-modules using Maven multi-Module mode
    • Common: Common entity information
    • Rabbitmq-publisher: Message publisher, based on SpringBoot
    • Rabbitmq-subscriber: message subscriber, based on SpringBoot
  2. Add rabbitMQ Maven dependencies to both the message publisher and subscriber projects
  1. Add rabbitMQ configuration information to both projects
    port: 5672
    username: username
    password: password
    # Virtual host, need background configuration first
    # virtual-host: springboot
  1. After the above three steps are complete, the rabbitMQ infrastructure is set up
  2. Rabbitmq configuration properties class
    • org.springframework.boot.autoconfigure.amqp.RabbitProperties

Two, four switches

2.1 Direct – The switch is directly connected

2.1.1 Message Sender

  1. Create a new configuration class in the message publisher that declares the switch information
    • Only the switch is declared; queues and switch bindings are subscriber operations
    • Different types provide different switches
    • If only the exchange is declared, the exchange is not created, but is created at binding time or when a message is sent
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

public class AmqpPublisherConfig {
    public DirectExchange emailDirectExchange(a) {
        // Declaration mode 1
        // return new DirectExchange("");
        // Declaration mode 2
  1. To send messages, use RabbitTemplate, the RabbitMQ message sender provided for SpringBoot
    • org.springframework.amqp.rabbit.core.RabbitTemplate
    • Sending a Message
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;

public class PublishController {
    private RabbitTemplate rabbitTemplate;

    public Object direct(String message) {
        try {
            rabbitTemplate.convertAndSend("Switch"."Routing key", message);
            return message;
        } catch (AmqpException e) {
2.2.2 Message Receiver

  1. The recipient needs to set the following parameters
    • Switch: Indicates the switch type corresponding to new
    • Queue: Only the Queue type is identified by name
    • Switch and queue binding: through bindingBuilder.bind (queue).to(switch).with(routing key);
    • Only exchange and queue bindings are declared, and are not created immediately, but when messages are sent or queues are listened to
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

public class AmqpSubscriberConfig {
   /** * Directly connect to the switch */
    public DirectExchange emailDirectExchange(a) {
        // Declaration mode 1
        // return new DirectExchange("");
        // Declaration mode 2
        return ExchangeBuilder.directExchange("").build();

    /** * declare queue */
    public Queue emailQueue(a) {
        // Declaration mode 1
        // return new Queue("");
        // Declaration mode 2
        return QueueBuilder.durable("").build();

    /** * switch and queue binding */
    public Binding emailBiding(Queue emailQueue, DirectExchange emailDirectExchange) {
        // Bind routes to switches using routing keys
  1. Listening to the queue
    • The listening queue must exist or an error will be reported
    • The message is automatically acknowledged when the queue consumption is completed
    • If more than one queue listens to a queue at the same time, messages are processed by different methods in rotation
    • You can specify the receive type in the parameter, and the message will be automatically converted to the corresponding type
    • You can also specify the Message argument to get the corresponding Message information
      • org.springframework.amqp.core.Message
      • Get news attributes: message. GetMessageProperties ()
      • Get the message content: message.getBody()
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/** * Message subscription listener */
public class SubscriberListener {
    /** * direct listener, the same listener queue messages will be processed in turn */
    @RabbitListener(queues = "")
    public void receiver01(String msg) {
        System.out.println("receiver01 message = " + msg);

    @RabbitListener(queues = "")
    public void receiver02(String msg) {
2.1.3 Message publishing subscription

  1. Start the subscriber first and see the queue declaration

2. Start the publisher and publish the message

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;

public class PublishController {
    private RabbitTemplate rabbitTemplate;

    public Object direct(String message) {
        try {
            // Specify the switch and routing key to send
            rabbitTemplate.convertAndSend(""."", message);
            return message;
        } catch (AmqpException e) {
  1. Subscribers receive messages in turn
receiver01 message = direct
receiver02  message = direct
receiver01 message = direct
receiver02  message = direct
receiver01 message = direct
receiver02  message = direct
2.2 Topic – Topic switch

2.2.1 Message sender

  1. Declare the Topic switch
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

public class BlogPublisherConfig {
    public Exchange blogTopicExchange(a) {
        return ExchangeBuilder.topicExchange("").build(); }}Copy the code
  1. Statement of the controller
public Object topic(String routingKey, String message) {
    rabbitTemplate.convertAndSend("", routingKey, message);
    return routingKey + ":" + message;
2.2.2 Message Receiver

  1. Declare the exchange, three queues, and the binding of queues
    • * : matches a string
    • Matches one or more strings
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

public class BlogSubscriberConfig {
    /** * Topic switch */
    public TopicExchange blogTopicExchange(a) {
        return ExchangeBuilder.topicExchange("").build();

    public Queue blogJavaQueue(a) {
        return QueueBuilder.durable("").build();

    public Queue blogMqQueue(a) {
        return QueueBuilder.durable("").build();

    public Queue blogAllQueue(a) {
        return QueueBuilder.durable("").build();

    public Binding blogJavaBinding(TopicExchange blogTopicExchange, Queue blogJavaQueue) {
        return BindingBuilder.bind(blogJavaQueue).to(blogTopicExchange).with("");

    public Binding blogMqBinding(TopicExchange blogTopicExchange, Queue blogMqQueue) {
        return BindingBuilder.bind(blogMqQueue).to(blogTopicExchange).with("");

    public Binding blogAllBinding(TopicExchange blogTopicExchange, Queue blogAllQueue) {
        // #: match one or more * : match one
  1. Listening to the queue
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

public class BlogService {
    /** * topic monitor */
    @RabbitListener(queues = "")
    public void blogJavaListener(String message) {
        System.out.println("blogJavaListener message = " + message);

    @RabbitListener(queues = "")
    public void blogMqListener(String message) {
        System.out.println("blogMqListener message = " + message);

    @RabbitListener(queues = "")
    public void blogAllaListener(String message) {
2.2.3 Message publishing and subscription

  1. The publisher sends the message
    • http://localhost:8071/topic?
    • http://localhost:8071/topic?
  2. Subscribers receive messages
    • Full matching and fuzzy matching
    • All matches and either one is going to be matched
blogJavaListener message = hello
blogAllListener message = hello
blogAllListener message = hello
blogMqListener message = hello
2.3 FANout – Broadcast switch

2.3.1 Message sender

  1. Declare the FANout switch
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

public class NoticePublisherConfig {
    public Exchange radioFanoutExchange(a) {
        return ExchangeBuilder.fanoutExchange("").build(); }}Copy the code
  1. Statement of the controller
public Object fanout(String message) {
    rabbitTemplate.convertAndSend("".null, message);
    return message;
2.32 Message Receiver

  1. Create switches, routing keys, and bindings
    • There is no need to use routing keys
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

public class NoticeSubscriberConfig {
    public FanoutExchange radioFanoutExchange(a) {
        return ExchangeBuilder.fanoutExchange("").build();

    public Queue radioQueue(a) {
        return QueueBuilder.durable("").build();

    public Binding radioBinding(FanoutExchange radioFanoutExchange, Queue radioQueue) {
        // The broadcast switch binding has no routing key
  1. Listening to the queue
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

public class NoticeService {

    @RabbitListener(queues = "")
    public void radioListener(String message) {
2.3.3 Message publishing and subscription

  1. The publisher sends the message
    • http://localhost:8071/fanout?message=fanout
  2. Subscribers receive messages
radioListener message = fanout
2.4 Headers – Headers switch

2.4.1 Message sender

  1. The HEADERS mode ignores routing keys through header matching
  2. The sender needs to create the queue
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

public class HeadersPublisherConfig {
    public Exchange radioHeadersExchange(a) {
        return ExchangeBuilder.headersExchange("exchange.headers.springboot.headers").build(); }}Copy the code
  1. Create controller to send message
    • MessageProperties and Message packet is: org. Springframework. Closer. The core
    • You need to create a MessageProperties object to set the header information
    • Message is used to store messages and Message attribute information
public Object headers(@RequestParam Map<String, String> param) {
    MessageProperties properties = new MessageProperties();
    properties.setHeader("name", param.get("name"));
    properties.setHeader("token", param.get("token"));
    Message mqMessage = new Message(param.get("message").getBytes(), properties);
    rabbitTemplate.convertAndSend("exchange.headers.springboot.headers".null, mqMessage);
    return properties;
2.4.2 Message Receiver

  1. The receiver needs to declare the exchange, queue, and binding as well as the previous three
  • Different rules need to be used when queue binding
    • BindingBuilder.bind(headersQueue01).to(headersExchange).whereAll(key).match()
      • All field attributes and values match
    • BindingBuilder.bind(headersQueue02).to(headersExchange).whereAny(key).match()
      • Any field attributes and values match
    • BindingBuilder.bind(headersQueue03).to(headersExchange).whereAll(“name”, “token”).exist()
      • Specifies that all property fields exist
    • BindingBuilder.bind(headersQueue03).to(headersExchange).whereAny(“name”, “token”).exist()
      • Specifies that any property exists
  • The attributes stored in headerMap are the attributes encapsulated in the sender. If the attributes match perfectly, the route is correct
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

public class HeadersSubscriberConfig {
    public HeadersExchange headersExchange(a) {
        return ExchangeBuilder.headersExchange("exchange.headers.springboot.headers").build();

    public Queue headersQueue01(a) {
        return QueueBuilder.durable("queue.headers.springboot.01").build();

    public Queue headersQueue02(a) {
        return QueueBuilder.durable("queue.headers.springboot.02").build();

    public Queue headersQueue03(a) {
        return QueueBuilder.durable("queue.headers.springboot.03").build();

    public Binding headers01Binding(HeadersExchange headersExchange,Queue headersQueue01) {
        Map<String, Object> key = new HashMap<>(4);
        return BindingBuilder.bind(headersQueue01).to(headersExchange).whereAll(key).match();

    public Binding headers02Binding(HeadersExchange headersExchange,Queue headersQueue02) {
        Map<String, Object> key = new HashMap<>(4);
        return BindingBuilder.bind(headersQueue02).to(headersExchange).whereAny(key).match();

    public Binding headers03Binding(HeadersExchange headersExchange,Queue headersQueue03) {
        // Both name and token need to exist
        return BindingBuilder.bind(headersQueue03).to(headersExchange).whereAll("name"."token").exist();
        // Any name or token exists
  1. The queue to monitor
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

public class HeadersService {

    @RabbitListener(queues = "queue.headers.springboot.01")
    public void headers01Listener(String message) {
        System.out.println("headers01Listener message = " + message);

    @RabbitListener(queues = "queue.headers.springboot.02")
    public void headers02Listener(String message) {
        System.out.println("headers02Listener message = " + message);

    @RabbitListener(queues = "queue.headers.springboot.03")
    public void headers03Listener(String message) {
2.4.3 Message Publishing and Subscription

  1. Send a message
    • http://localhost:8071/headers?name=java&token=001&message=headers
    • http://localhost:8071/headers?name=java&token=002&message=headers
    • http://localhost:8071/headers?name=mq&token=003&message=headers
  2. Receives the message
headers01Listener message = headers
headers02Listener message = headers
headers03Listener message = headers
headers02Listener message = headers
headers03Listener message = headers
headers03Listener message = headers
