Do you really know RabbitMq? (2)

This article uses the environment

  • RabbitMq 3.7.27 (not used on the latest docker because Exchange can’t be clicked)
  • openJdk8

How are RabbitMq messages sent

  • RabbitMq explains that the message producer sends the message to the switch, which uses a routing policy to send the message to the queue, and the consumer to retrieve the message from the queue. That is, producers cannot send messages directly to consumer queues, so do you really know why in RabbitMq? (a) can the queue name be sent directly, so that the queue can receive information directly? So let’s talk about it.
  • There is also, of course, how the producer can confirm that the message is not lost and sent to Mq, and how the consumer can confirm that the message has been consumed. Producers and consumers have their own different mechanisms to confirm whether the delivery or consumption is successful, which will be discussed in detail in this article.

Learn about the four types of RabbitMq switches

Exchange type Default pre-declared names
Direct exchange (Empty string) and amq.direct
Fanout exchange amq.fanout
Topic exchange amq.topic
Headers exchange amq.match (and amq.headers in RabbitMQ)
  • In fact, there is another one called default. Exchange which is called RabbitMq. Do you really know it? (a) this article, why can directly send the queue name and not send the switch can obtain the reason.

fanout

The type of fanout switch is actually fanout, which means that the big guys call it a fan switch, which spreads the message like a fan. From the figure above, we can basically confirm the principle of switch transmission. Fanout switches and queues bound to this switch can receive information. This switch is one of the more common and easy types to use.

direct

The direct exchange passes the message to the queue based on the message routing key. Direct switching is ideal for unicast routing of messages (although they can also be used for multicast routing). How it works:

This is the official RabbitMq explanation

  • A queue binds to the exchange with a routing key K
  • When a new message with routing key R arrives at the direct exchange, the exchange routes it to the queue if K = R

Translate it in poor English

  • A K with a routingKey is bound to the direct switch
  • When a new message is sent to the direct switch and the routingKey is R equals K, the switch routes it to the queue

headers

Headers switches use multiple attributes for routing, which means that instead of routing with a routingKey, a route is routed using attributes, which are received if matched, and not vice versa. Let’s try a couple of examples

/ * * *@author Kakki
 * @version 1.0
 * @createThe 2021-06-18 16:56 * /
@Configuration
public class QueueAutoConfig implements SmartInitializingSingleton {
    @Autowired
    private AmqpAdmin amqpAdmin;
    @Override
    public void afterSingletonsInstantiated(a) {
        // Create two switches that do not match policies and bind them
        Queue queueAll = new Queue("Headers_ALL_QUEUE");
        amqpAdmin.declareQueue(queueAll);
        CustomExchange exchangeAll = new CustomExchange("Headers_EXCHANGE"."headers");
        amqpAdmin.declareExchange(exchangeAll);
        Binding bindingAll = BindingBuilder
                .bind(queueAll)
                .to(exchangeAll)
                .with("Headers_ROUTING_KEY")
                .and(new HashMap<String, Object>(){{put("all-1"."value1"); put("all-2"."value2"); put("x-match"."all"); }}); amqpAdmin.declareBinding(bindingAll); Queue queueAny =new Queue("Headers_ANY_QUEUE");
        amqpAdmin.declareQueue(queueAny);
        CustomExchange exchangeAny = new CustomExchange("Headers_EXCHANGE"."headers");
        amqpAdmin.declareExchange(exchangeAny);
        Binding bindingAny = BindingBuilder
                .bind(queueAny)
                .to(exchangeAny)
                .with("Headers_ROUTING_KEY")
                .and(new HashMap<String, Object>(){{put("any-1"."value1"); put("any-2"."value2"); put("x-match"."any"); }}); amqpAdmin.declareBinding(bindingAny); }/** * unit test */
    @Test
    public void sendMq(a) {
        MessageProperties all1 = new MessageProperties();
        all1.setHeader("all-1"."value1");
        all1.setHeader("all-2"."value2");
        MessageProperties all2 = new MessageProperties();
        all2.setHeader("all-1"."value1");
        MessageProperties any1 = new MessageProperties();
        any1.setHeader("any-1"."value1");
        any1.setHeader("any-2"."value2");
        MessageProperties any2 = new MessageProperties();
        any2.setHeader("any-1"."value1");
        Message all1Message = new Message("ALL1".getBytes(StandardCharsets.UTF_8), all1);
        Message all2Message = new Message("ALL2".getBytes(StandardCharsets.UTF_8), all2);
        Message any1Message = new Message("ANY1".getBytes(StandardCharsets.UTF_8), any1);
        Message any2Message = new Message("ANY2".getBytes(StandardCharsets.UTF_8), any2);
        rabbitTemplate.convertAndSend("Headers_EXCHANGE".null, all1Message);
        rabbitTemplate.convertAndSend("Headers_EXCHANGE".null, all2Message);
        rabbitTemplate.convertAndSend("Headers_EXCHANGE".null, any1Message);
        rabbitTemplate.convertAndSend("Headers_EXCHANGE".null, any2Message);
        while (true); }}Copy the code
/ * * *@author Kakki
 * @version 1.0
 * @create2021-06-18 17:13 * MQ Consumer */
@Component
@Slf4j
public class MqCustomer {

    @RabbitListener(queuesToDeclare = {@Queue(name = "Headers_ALL_QUEUE", arguments = {@Argument(name = "all-1", value = "value1"), @Argument(name = "all-2", value = "value2")})})
    public void handlerAll1(Message message) {
        log.info("[Headers_ALL_QUEUE1]:{}".new String(message.getBody(), StandardCharsets.UTF_8));
    }

    @RabbitListener(queuesToDeclare = {@Queue(name = "Headers_ALL_QUEUE", arguments = {@Argument(name = "all-1", value = "value1")})})
    public void handlerAll2(Message message) {
        log.info("[Headers_ALL_QUEUE2]:{}".new String(message.getBody(), StandardCharsets.UTF_8));
    }

    @RabbitListener(queuesToDeclare = {@Queue(name = "Headers_ANY_QUEUE", arguments = {@Argument(name = "any-1", value = "value1"), @Argument(name = "any-2", value = "value2")})})
    public void handlerAny1(Message message) {
        log.info("[Headers_ANY_QUEUE1]:{}".new String(message.getBody(), StandardCharsets.UTF_8));
    }

    @RabbitListener(queuesToDeclare = {@Queue(name = "Headers_ANY_QUEUE", arguments = {@Argument(name = "any-1", value = "value1")})})
    public void handlerAny2(Message message) {
        log.info("[Headers_ANY_QUEUE2]:{}".new String(message.getBody(), StandardCharsets.UTF_8));
    }

    @RabbitListener(queuesToDeclare = {@Queue(name = "Headers_ANY_QUEUE", arguments = {@Argument(name = "any-1", value = "value1"), @Argument(name = "any-3", value = "value3")})})
    public void handlerAny3(Message message) {
        log.info("[Headers_ANY_QUEUE3]:{}".newString(message.getBody(), StandardCharsets.UTF_8)); }}Copy the code
The 2021-07-03 17:47:24. 922 INFO 9544-1-1] [ntContainer# com. Example. Demo. Config. MqCustomer: [Headers_ALL_QUEUE1] : ALL1 17:47:24 2021-07-03. 9544-923 the INFO] [ntContainer# 3-1 com. Example. Demo. Config. MqCustomer: [Headers_ANY_QUEUE1] : ANY1 17:47:24 2021-07-03. 9544-923 the INFO] [ntContainer# 4-1 com. Example. Demo. Config. MqCustomer: [Headers_ANY_QUEUE2]:ANY2Copy the code
  • Therefore, a HEADERS switch does not need a routingKey. The matching rules are all and any based on the header attribute.
  • When a message producer sends it to the switch, it is consumed randomly by unused consumers. That is, I only produce two messages, and even if there are 100 matched consumers, only two consumers can consume them
  • When the matching mechanism is all, all headers must match
  • X – will not be counted as a match. Manual dog head)

topic

Topic switches route messages to one or more queues based on a match between the message routing key and the pattern used to bind queues to the exchange. Topic exchange types are commonly used to implement various publish/subscribe schema variants. Topic exchange is typically used for multicast routing of messages.

Topic exchange has a very wide range of use cases. Topic exchanges should be considered whenever a problem involves multiple consumers/applications selectively choosing the type of message they want to receive.

/ * * *@author Kakki
 * @version 1.0
 * @createThe 2021-06-18 16:56 * /
@Configuration
public class QueueAutoConfig implements SmartInitializingSingleton {
    @Autowired
    private AmqpAdmin amqpAdmin;
    @Override
    public void afterSingletonsInstantiated(a) {
        Queue topicQueue = new Queue("topicQueue");
        amqpAdmin.declareQueue(topicQueue);
        CustomExchange topicExchange = new CustomExchange("topicExchange"."topic");
        amqpAdmin.declareExchange(topicExchange);
        Binding topicBinding = BindingBuilder
                .bind(topicQueue)
                .to(topicExchange)
                .with("topic.#")
                .noargs();
        amqpAdmin.declareBinding(topicBinding);
    }

    /** * unit test */
    @Test
    public void sendMq(a) {
        rabbitTemplate.convertAndSend("topicExchange"."Topic. Hello 1"."Topic. Hello 1");
        rabbitTemplate.convertAndSend("topicExchange"."Topic. How do you do 2"."Topic. How do you do 2");
        rabbitTemplate.convertAndSend("topicExchange"."Hi topic 1"."Hi topic 1");
        rabbitTemplate.convertAndSend("topicExchange"."Hello 1topic. Hello 1"."Hello 1topic. Hello 1");
        while (true); }@RabbitListener(queues = {"topicQueue"})
    public void handlerT(Message message) {
        log.info("[Message coming]:{}".newString(message.getBody(), StandardCharsets.UTF_8)); }}Copy the code
The 18:15:30 2021-07-03. 9296-076 the INFO [ntContainer# 0-1] com. Example. Demo. Config. MqCustomer: [news] : topic. Hello 1 18:15:30 2021-07-03. 9296-077 the INFO [ntContainer# 0-1] com. Example. Demo. Config. MqCustomer: [news] : topic. Hello 2Copy the code
  • Topic is a multicast mode, that is, the producer sends a message, as long as the match can receive the message, kind of like the publish and subscribe type, as long as I subscribe, I can receive the message
  • Topic # is the match type and can be received as long as it is matched

default

RabbitMq also has a default switch, called the default switch, which is a special type of direct switch that does not need to be sent to the switch, just select a RoutingKey and send it to the specified queue. It looks like messages are sent directly to queues instead of going through the switch, but they actually go through the direct switch

summary

The four types of RabbitMq switches apply to different service scenarios. It can not be said that the switch should be suitable for this service and another service, but it depends on specific business scenarios to flexibly use. Of course, the simplest ones for the author are Fanout and Direct. RabbitMq has a lot of uses, and a lot to offer. For me, MQ can be used for peak decoupling and flexibility in any business scenario I encounter. Of course RabbitMq does more than just learn the four switch types. Of course, I will continue to update RabbitMq.