preface

This blog has been indexed at GitHub: HTTPS://zhouwenxing.github.io/
Source code has also been included at GitHub: HTTPS://github.com/zhouwenxing/lonely-wolf-note
Copy the code

Using message queues must ensure that messages sent by producers can be received by consumers. How do producers receive messages? The following is a working model of RabbitMQ:

In the figure above, producers send messages to Exchange, which in turn sends messages to different queues, which store message queues. If there are multiple producers, then after messages are sent to Exchange, How do you bind a Queue?

How do I send messages using RabbitMQ

RabbitMQ provides three ways to route messages.

Direct connection mode

The Exchange and Queue are bound by specifying an exact binding key. That is, when a directly connected switch is created, the producer carries a routing key when sending a message. Only when a binding key matches exactly is the message routed from the switch to the message queue that satisfies the routing relationship. Consumers can then retrieve the message according to the queue they are listening to (Queue1 is bound to order, as shown below). The routing key must be order to allocate the message to Queue1:

Topic pattern

The Direct mode has some limitations, sometimes we need to divide by type, such as order classes to one queue and product classes to another queue, so subject mode is provided for fuzzy matching in RabbitMQ. Two wildcards are supported using the topic type connection:

Direct matches can only be exact, sometimes fuzzy matches are required and subject connections are required. RabbitMQ supports two wildcards:

  • # :0One or more words
  • * said:1A word

PS: When wildcard characters are used, words are separated by decimal points. For example, ABC. Def indicates that there are two words ABC and def.

As shown in the following figure, since Queue1 is bound to order.#, sending messages with a routing key of order or order.xxx can cause messages to be allocated to Queue1:

Broadcast Fanout mode

When we define a type of radio switches do not need to specify the binding key, and the producer sends a message to the switch, also do not need to carry the routing key, at this time when messages arrive switches, all with the binding will receive message queue, message sending of this pattern is suitable for the notification class requirements.

Queue1, Queue2, and Queue3 are all bound to a Fanout switch, so when the Fanout Exchange receives a message, it sends the message to all three queues at the same time:

You can also query the information about created switches and queues in the RabbitMQ background management system. You can directly create queues and switches in the background management system:

Message sending

The following uses a SpringBoot example to explore three ways to send messages.

  • 1.application.ymlAdd the following configuration to the file:
spring:
  rabbitmq:
    host: ip
    port: 5672
    username: admin
    password: 123456
Copy the code
  • 2. Add oneRabbitConfigThe configuration class (omit package name and import to save space) declares three switches and three queues and binds them separately:
@Configuration
public class RabbitConfig {
    // Directly connect to the switch
    @Bean("directExchange")
    public DirectExchange directExchange(a){
        return new DirectExchange("LONGLY_WOLF_DIRECT_EXCHANGE");
    }

    // Topic switch
    @Bean("topicExchange")
    public TopicExchange topicExchange(a){
        return new TopicExchange("LONGLY_WOLF_TOPIC_EXCHANGE");
    }

    // Broadcast switch
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange(a){
        return new FanoutExchange("LONGLY_WOLF_FANOUT_EXCHANGE");
    }


    @Bean("orderQueue")
    public Queue orderQueue(a){
        return new Queue("LONGLY_WOLF_ORDER_QUEUE");
    }

    @Bean("userQueue")
    public Queue userQueue(a){
        return new Queue("LONGLY_WOLF_USER_QUEUE");
    }

    @Bean("productQueue")
    public Queue productQueue(a){
        return new Queue("LONGLY_WOLF_PRODUCT_QUEUE");
    }

    //Direct the switch is bound to orderQueue with the key order.detail
    @Bean
    public Binding bindDirectExchange(@Qualifier("orderQueue") Queue queue, @Qualifier("directExchange") DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("order.detail");
    }

    //Topic switches are bound to userQueue with the binding key user.#
    @Bean
    public Binding bindTopicExchange(@Qualifier("userQueue") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("user.#");
    }

    // The Fanout switch is bound to productQueue
    @Bean
    public Binding bindFanoutExchange(@Qualifier("productQueue") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
        returnBindingBuilder.bind(queue).to(fanoutExchange); }}Copy the code
  • 3. Create a new consumerExchangeConsumerClass, and different methods are implemented to listen on different queues:
@Component
public class ExchangeConsumer {

    /** * listen on the message queue bound to the direct switch */
    @RabbitHandler
    @RabbitListener(queues = "LONGLY_WOLF_ORDER_QUEUE")
    public void directConsumer(String msg){
        System.out.println("Direct The switch received the message:" + msg);
    }

    /** * listen to the message queue */ that is bound to the topic switch
    @RabbitHandler
    @RabbitListener(queues = "LONGLY_WOLF_USER_QUEUE")
    public void topicConsumer(String msg){
        System.out.println("Topic switch receives message:" + msg);
    }

    /** * listen on the message queue bound to the FANout switch */
    @RabbitHandler
    @RabbitListener(queues = "LONGLY_WOLF_PRODUCT_QUEUE")
    public void fanoutConsumer(String msg){
        System.out.println("Fanout switch receives message:"+ msg); }}Copy the code
  • 4. Add oneRabbitExchangeControllerClass to act as a producer for sending messages:
@RestController
@RequestMapping("/exchange")
public class RabbitExchangeController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping(value="/send/direct")
    public String sendDirect(String routingKey,@RequestParam(value = "msg",defaultValue = "no direct message") String msg){
        rabbitTemplate.convertAndSend("LONGLY_WOLF_DIRECT_EXCHANGE",routingKey,msg);
        return "succ";
    }
    @GetMapping(value="/send/topic")
    public String sendTopic(String routingKey,@RequestParam(value = "msg",defaultValue = "no topic message") String msg){
        rabbitTemplate.convertAndSend("LONGLY_WOLF_TOPIC_EXCHANGE",routingKey,msg);
        return "succ";
    }
    @GetMapping(value="/send/fanout")
    public String sendFaout(String routingKey,@RequestParam(value = "msg",defaultValue = "no faout message") String msg){
        rabbitTemplate.convertAndSend("LONGLY_WOLF_FANOUT_EXCHANGE",routingKey,msg);
        return "succ"; }}Copy the code
  • 5. Start the service. When we call the first interface, the routing key and the binding keyorder.detailWhen it’s an exact match,directConsumerI get a message, and again, when I call the second interface, the routing key satisfiesuser.#When,topicConsumerThe message is received, and as long as the third interface is called, whether the routing key is specified or not,fanoutConsumerI get messages.

What if the message is out of date

We’ve learned to simply send a message, but does that stop us there? Obviously not, and to play it you need to play it better, so let’s spice up the news.

TTL (Time To Live)

TTL is the maximum lifetime of a message in a queue. A message that exceeds the configured TTL in a queue is said to be dead. Note, however, that dead messages are not guaranteed to be immediately removed from the queue, but they are guaranteed not to be delivered.

There are two ways to set the TTL:

  • 1. Set x-message-TTL to the queue. In this case, all messages sent to the queue will become dead messages when they reach TTL.

    This situation occurs when a message is simultaneously routed to N queues with TTL time. Since the TTL of each queue is not necessarily the same, the same message in different queues may die at different times or may not die (TTL is not set). So the death of messages in one queue does not affect messages in other queues.

  • 2. Set an expiration time for a message.

    At this time, it should be noted that when the message reaches the TTL, it may not be immediately discarded, because only the message in the queue header will be discarded after it expires. If the message in the queue header is not TTL set, but the second message is TTL set, then even if the second message becomes dead, Messages must wait until the header of the queue is consumed before they are discarded, and dead messages are counted in statistics (such as the total number of messages in the queue) before they are discarded. Therefore, to make better use of the TTL feature, it is recommended that consumers consume messages online to ensure that messages are discarded faster and avoid message accumulation.

PS: There may be natural competition conditions between message expiration and consumer delivery. For example, messages may expire in transit (before reaching the consumer).

Queue survival

Unlike TTL for messages, we can deal with queues by setting the ‘X-expires’ attribute, where the server guarantees that a queue will be deleted if it is not used within a specified expiration time (but there is no guarantee of how quickly it will be deleted after the specified expiration time).

TTL and expiration time actual combat

  • 1. Defined aboveRabbitConfigClass, add one moreTTLQueue and bind it todirectOn the switch:
@Bean("ttlQueue")
public Queue ttlQueue(a){
    Map<String, Object> map = new HashMap<String, Object>();
    map.put("x-message-ttl".5000);// All messages in the queue expire after 5 seconds
    map.put("x-expires".100000);// The queue is idle for 10 seconds and then deleted
    // Parameter 1-name: queue name
    // Parameter 2-durable: Whether to be durable
    // Parameter 3-EXCLUSIVE: specifies whether to exclude. When set to true, this queue is available only to connections that declare the current queue. Once the Connection is disconnected, the queue is automatically deleted
    // Parameter 4 -autodelete: indicates whether to delete the file automatically. The premise is that at least one consumer must be connected to the current queue, and when all consumers are disconnected, the queue is automatically deleted
    return new Queue("LONGLY_WOLF_TTL_QUEUE".false.false.false,map);
    }

// TTL queues bind to direct switches (switches and queues can be many-to-many)
@Bean
public Binding ttlBindFanoutExchange(@Qualifier("ttlQueue") Queue queue, @Qualifier("directExchange") DirectExchange directExchange){
    return BindingBuilder.bind(queue).to(directExchange).with("test.ttl");
}
Copy the code
  • 2, inExchangeConsumerListen on the consumer classTTLQueue (different from other consumers, this is changed to pass in order to print out the queue attributesMessageObject to receive messages) :
/** * listen to the TTL message queue */
@RabbitHandler
@RabbitListener(queues = "LONGLY_WOLF_TTL_QUEUE")
public void ttlConsumer(Message message){
    System.out.println("TTL queue received message:" + new String(message.getBody()));
    System.out.println("TTL queue received message:" + JSONObject.toJSONString(message.getMessageProperties()));
}
Copy the code
  • 3, in the producer classRabbitExchangeControllerA new interface is added to test sending expired messagesMessagePropertiesSet up theexpirationProperty is equivalent to setting one for a single messageTTL
@GetMapping(value="/send/ttl")
public String sendTtl(String routingKey,@RequestParam(value = "msg",defaultValue = "no ttl message") String msg){
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setExpiration("5000");// Delete after 5 seconds, i.e. TTL attribute (for single message)
    Message message = new Message(msg.getBytes(), messageProperties);
    rabbitTemplate.convertAndSend("LONGLY_WOLF_DIRECT_EXCHANGE",routingKey,message);
    return "succ";
}
Copy the code
  • 4, at this time, if we remove the consumer’s monitoring and then send a message, it can be seen in the management background5The message will be deleted after a second,10The queue will be deleted after seconds.

PS: If the TTL is set for both the queue and the single message, the shortest TTL is used.

Other attributes

There are a few other attributes that can be set in a queue, but we won’t use them here:

  • X-message-ttl: indicates the lifetime (in milliseconds) of messages in the queue. Messages that reach the TTL may be deleted.
  • X-expires: How long a queue will be deleted after it has not been accessed in milliseconds.
  • X-max-length: indicates the maximum number of messages in the queue.
  • X-max-length-bytes: indicates the maximum size of a queue (bytes).
  • Overflow: Policies after queues overflow. You can set the following parameters:reject-publish– Discard recently published messages, if enabled publisher confirm(Publisher confirms), the publisher will send by sendingbasic.nackMessage notification rejection, if the current queue binding has more than one consumer, the message is being receivedbasic.nackIf a notification is rejected, it will still be published to another queue.drop-head– Discard queue header messages (this policy is only supported in cluster mode)reject-publish-dlx– Recently posted messages are put into a dead letter queue.
  • X-dead-letter-exchange: queued dead-letter exchange.
  • X-dead-letter-routing-key: indicates the routing key of the dead-letter switch.
  • X – single – active – consumer: true/false. If multiple consumers are bound at the same time, only the first one will be activated. Unless the first consumer is canceled or dies, it will automatically transfer to the next consumer.
  • X-max-priority: indicates the maximum priority of messages in the queue. The priority of messages cannot exceed this value.
  • X – the queue – mode:3.6.0This version was introduced primarily to enable lazy loading. Queues persist incoming messages to disk as quickly as possible and then load them only when the user requests themRAMMemory. This parameter supports two values:defaultlazy. If the parameter is not set, the default value isdefaultDo not make any changes; When set tolazyIt’s lazy loading.
  • X-queue-master-locator: indicates that packets are destined to be sentFIFOIn ha cluster mode, you need to select a node as the primary node. There are three main modes for this parameter:min-masters– Hosts the minimum number of nodes bound to the host;client-local– Select the declared queue that is already connected to the client node;random– Select a node at random.

The magic of Dead Letters

What is new about the dead letter queue mentioned in the parameter description above? In fact, the name is very understandable, it refers to the news of death, or homeless news. There are three conditions for a message to enter a dead-letter queue:

  • 1. The message was rejected by the consumer and not requeued.

  • 2. The message expires (that is, the TTL is set).

  • 3. If the queue length exceeds Max Length or Max Length bytes, the message at the head of the queue will be sent to the dead letter queue.

Dead letter queue actual combat

  • 1. Defined aboveRabbitConfigClass, define a dead-letter switch and place the previousttlAdd a new attribute to the queuex-dead-letter-exchange, and finally bind the dead letter queue to the dead letter switch:
// Dead-letter switch (topic or FANout can also be used)
@Bean("deatLetterExchange")
public DirectExchange deatLetterExchange(a){
    return new DirectExchange("LONGLY_WOLF_DEAD_LETTER_DIRECT_EXCHANGE");
}
@Bean("ttlQueue")
public Queue ttlQueue(a){
    Map<String, Object> map = new HashMap<String, Object>();
    map.put("x-message-ttl".5000);// All messages in the queue expire after 5 seconds
    map.put("x-dead-letter-exchange"."LONGLY_WOLF_DEAD_LETTER_DIRECT_EXCHANGE");// Dead messages go to the dead-letter switch
    return new Queue("LONGLY_WOLF_TTL_QUEUE".false.false.false,map);
}
// Dead letter queue
@Bean("deadLetterQueue")
public Queue deadLetterQueue(a){
    return new Queue("LONGLY_WOLF_DEAD_LETTER_QUEUE");
}
Copy the code
  • 2, inExchangeConsumerThe consumer class will listenTTLUncomment listener on queue:
	/** * listen to the TTL message queue */
    @RabbitHandler
// @RabbitListener(queues = "LONGLY_WOLF_TTL_QUEUE")
    public void ttlConsumer(Message message){
        System.out.println("TTL queue received message:" + new String(message.getBody()));
        System.out.println("TTL queue received message:" + JSONObject.toJSONString(message.getMessageProperties()));
    }
Copy the code
  • 3, at this timeTTLQueues have no consumers and are set for messagesTTL5Seconds, so5It’s gonna be dead letter in seconds.
  • 5. Access interface:http://localhost:8080/exchange/send/ttl?routingKey=test&msg= test dead-letter queueAfter sending the message, wait5View the message in seconds and enter the dead-letter queue:

Did the message actually go through

Now that you know the basics of sending messages, can you rest easy? After the message is sent, does the consumer actually receive the message? How do I know if a message was successfully sent after it was sent? What if the sent message is incorrectly routed and cannot be routed to the queue? Do you all have these questions? Don’t worry, let’s break it down one by one.

A message can be divided into the following four stages from the time when the producer sends the message to the time when the consumer finishes consuming the message:

  • 1. The producer sends the message toBroker(i.e. :RabbitMQSwitch).
  • 2. The switch routes messages to queues.
  • 3. The queue stores the message after receiving it.
  • 4. The consumer retrieves the message from the queue for consumption.

Let’s take a step-by-step look at how RabbitMQ ensures the reliability of message delivery through these four steps.

Does the message actually reach the switch

When we send a message, how do we know that the other party has received it? It’s the same as writing letters. When we write a letter, how do we know that the other party receives the letter we send? The easiest way is for the recipient to write back to us. When we receive their reply, we can know that our letter has been successfully sent.

The server also provides two ways to tell a client (producer) if a message has been received in RabbitMQ: Transaction and Confirm.

Transaction pattern

To enable transactions in Java API programming, simply add the following code:

 try {
     channel.txSelect();// Start the transaction
     channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
     channel.txCommit();// Commit the transaction
 }catch (Exception e){
     channel.txRollback();// Message rollback
 }
Copy the code

The RabbitTemplate transaction setup is required in Spring Boot:

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);
    rabbitTemplate.setChannelTransacted(true);// Start the transaction
    return rabbitTemplate;
}
Copy the code

To learn about the transaction mechanism in RabbitMQ, enter ip.addr==192.168.1.1 in the Wireshark to capture packets from the local IP address. After a packet is sent, the following packets are captured:

Through data packets, it can be concluded that after the transaction is started, in addition to the original message sending, there is more communication for the transaction opening and transaction submission:

Once a transaction is started, a fatal disadvantage is that the message sending process is blocked. That is, one message must be sent successfully before another message is allowed to be sent. Because of this shortcoming of the transaction pattern, it is generally not recommended to start a transaction in a production environment, so is there a better way to implement delivery confirmation of messages? So let’s look at the Confirm mode.

Confirm mode

There are three message confirmation modes (transaction mode and confirmation mode cannot be enabled at the same time) :

  • Single confirmation mode: Sends a message to confirm a message. This confirmation mode is also inefficient.
  • Batch confirmation mode: Send a batch of messages and confirm them simultaneously. A disadvantage of batch sending is that once a message fails to be sent in the same batch, the system receives a failure notification and needs to resend all the messages.
  • Asynchronous confirmation mode: Messages are acknowledged while being sent. Messages can be acknowledged individually or in batches.

The Java API implements validation patterns

  • Single message confirmation mode
channel.confirmSelect();// Enable confirm mode
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
if (channel.waitForConfirms()){// Wait. ForConfirms(long time) methods can specify waiting times
    System.out.println("Message confirmation sent successfully");
}
Copy the code
  • Batch confirmation mode
channel.confirmSelect();// Enable confirm mode
// Batch send
for (int i=0; i<10; i++){ channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}
try{
    channel.waitForConfirmsOrDie();
}catch (IOException e){// If only one message is unacknowledged, an exception will be thrown
    System.out.println("Message delivery failed.");
}
Copy the code
  • Asynchronous confirmation mode
channel.addConfirmListener(new ConfirmListener() {
    /** * The message was sent successfully, that is, the callback *@paramDeliveryTag - Unique identifier ID (that is, nextPublishSeqNo retrieved when the message is sent) *@paramMultiple - Whether to confirm in batches. Multiple =true indicates that <=deliveryTag messages are confirmed in batches. Multiple =false indicates that only a single */ is confirmed
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {// Successful callback
        System.out.println("I got confirmation.");
        //TODO can do whatever it wants
    }

    /** * Callback after sending failure message *@paramDeliveryTag - Unique identifier ID (that is, nextPublishSeqNo retrieved when the message is sent) *@paramMultiple - Whether to confirm in batches. Multiple =true indicates that <=deliveryTag messages are confirmed in batches. Multiple =false indicates that only a single */ is confirmed
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {// Failed callback
        if (multiple) {// Batch confirm, 
            //TODO message resends?
        } else {// Non-batch, = Message delivery for deliveryTag failed
            //TODO message resends?}}}); channel.confirmSelect();// Enable confirm mode
for (int i=0; i<10; i++){// Batch send
    long nextSeqNo = channel.getNextPublishSeqNo();// Get the unique identifier for sending the message (incrementing from 1)
    //TODO can consider storing message ids
    channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}
Copy the code

SpringBoot implements confirmation mode

Through the spring configuration file. The rabbitmq. Publisher – confirm -type parameters configured to confirm (old version is spring. The rabbitmq. Publisher – confirms parameters).

  • 1. Added configuration file properties
spring:
  rabbitmq:
    publisher-confirm-type: correlated # none- disables callback (default) simple- see RabbitExchangeController#sendWithSimpleConfirm()
Copy the code
  • 2,RabbitConfigModify the configuration file as follows:
 @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
// rabbitTemplate.setChannelTransacted(true); // Start the transaction
        // Whether the message was successfully sent to Exchange
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(! ack){// Message sending failed
                    System.out.println("Failed to send message because:" + cause);
                    return;
                }
                // Message sent successfully
                System.out.println("Message sent successfully"); }});return rabbitTemplate;
    }
Copy the code

So when we send the message successfully, we will receive the callback.

  • 3. When the preceding parameter configuration is changed tosimpleIs used when sending messagesinvokecallwaitForConfirmsorwaitForConfirmsOrDieMethod to confirm whether the message was sent successfully:
 @GetMapping(value="/send/confirm")
 public String sendWithSimpleConfirm(String routingKey,@RequestParam(value = "msg",defaultValue = "no direct message") String msg){
       // Confirm by waitForConfirms method
        boolean sendFlag = rabbitTemplate.invoke(operations -> {
            rabbitTemplate.convertAndSend(
                    "LONGLY_WOLF_DIRECT_EXCHANGE"."routingKey",
                    msg
            );
            return rabbitTemplate.waitForConfirms(5000);
        });
        // This can also be confirmed using the waitForConfirmsOrDie method
        boolean sendFlag2 = rabbitTemplate.invoke(operations -> {
            rabbitTemplate.convertAndSend(
                    "LONGLY_WOLF_DIRECT_EXCHANGE"."routingKey",
                    msg
            );
            try {
                rabbitTemplate.waitForConfirmsOrDie(5000);
            }catch (Exception e){
                return false;
            }
            return true;
        });
        System.out.println(sendFlag);
        System.out.println(sendFlag2);
        return "succ";
    }
Copy the code

What if messages cannot be routed from the switch to the correct queue

If the above transaction or acknowledgement mechanism ensures that the message is successfully sent to the switch, then the switch is responsible for routing the message to the queue. In this case, if the queue does not exist or the route is wrong, the message will fail to be routed. How to guarantee this?

RabbitMQ also provides two ways to ensure that messages are routed correctly to queues: enable listening mode or add backup switch mode to back up data.

Listen to the callback

The above is the callback of whether a message is sent to the switch, and routing from the switch to the queue can also be turned on in confirmation mode.

Enable the listening mode in Java API mode

The following is the main code to enable listening. In order to save space, other irrelevant code has been omitted (the completed code has been uploaded to GitHub).

channel.addReturnListener(new ReturnListener() {
     @Override
     public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
         System.out.println("Received callback message not routed to queue:" + newString(body)); }});// Note that the third parameter, MANDATORY, needs to be set to true (sending an error route can receive a callback)
channel.basicPublish(EXCHANGE_NAME,"ERROR_ROUTING_KEY".true.null,msg.getBytes());
Copy the code

Spring Boot Enables the listening mode

Add the following configuration to the RabitConfig class:

 @Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);

    rabbitTemplate.setMandatory(true);// Enable the listening callback
    // If the message is successfully routed to the queue, it will receive a callback if it is not routed to the queue.
    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            System.out.println("Received callback message not routed to queue:" + newString(returnedMessage.getMessage().getBody())); }});return rabbitTemplate;
}
Copy the code

Backup switch

In addition to enabling listening, you can also define backup switches. If the original switch cannot correctly route to a queue, the switch switches to the backup switch and then routes to the correct queue. (Note the difference between a backup switch and a dead-letter switch.)

Java API to implement backup switch

The following is an example to implement a backup switch, because the backup switch is defined as a Topic type, all routes must meet the defined routes, in practice, usually set Fanout, because it is impossible to predict the number of wrong routes:

 // Declare a switch and specify a backup switch
Map<String,Object> argMap = new HashMap<String,Object>();
argMap.put("alternate-exchange"."TEST_ALTERNATE_EXCHANGE");
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,false.false,argMap);
// The queue is bound to the switch
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTEING_KEY);

// Declare the backup switch and backup queue and bind them. (To prevent receiving messages, it is recommended to set the backup switch type to Fanout.)
channel.queueDeclare("BAK_QUEUE".false.false.false.null);
channel.exchangeDeclare("TEST_ALTERNATE_EXCHANGE", BuiltinExchangeType.TOPIC);
channel.queueBind("BAK_QUEUE"."TEST_ALTERNATE_EXCHANGE"."ERROR.#");

String msg = "I'm a bak exchange msg";
channel.basicPublish(EXCHANGE_NAME,"ERROR.ROUTING_KEY".null,msg.getBytes());
Copy the code

Spring Boot implements backup switches

Spring Boot implements backup switches in the same way as Java API implementations:

  • 1, first inRabbiConfigTwo switches are added, one is the original switch and the other is the backup switch, and a backup queue is bound to the backup switch. In this case, the backup switch is oneFanoutNote that since this is mainly a demonstration of the backup switch, the original switch here is not bound to any queue, and therefore cannot be routed to a queue so that messages go to the backup switch:
// Used to test the original direct switch of the backup switch
@Bean("bakDirectEchange")
public DirectExchange bakDirectEchange(a){
    Map argMap = new HashMap<>();
    argMap.put("alternate-exchange"."LONGLY_WOLF_BAK_FANOUT_EXCHANGE");
    return new DirectExchange("LONGLY_WOLF_BAK_ORIGIN_DIRECT_EXCHANGE".false.false,argMap);
}

// Back up the broadcast switch
@Bean("bakFanoutExchange")
public FanoutExchange bakFanoutExchange(a){
    return new FanoutExchange("LONGLY_WOLF_BAK_FANOUT_EXCHANGE");
}
// Backup the queue
@Bean("bakQueue")
public Queue bakQueue(a){
    return new Queue("LONELY_WOLF_BAK_QUEUE");
}
// Bind the backup switch to the backup queue
@Bean
public Binding BindExchange(@Qualifier("bakQueue") Queue queue, @Qualifier("bakFanoutExchange") FanoutExchange fanoutExchange){
    return BindingBuilder.bind(queue).to(fanoutExchange);
}
Copy the code

Exchange econsumer ();

 /** * listen for backup message queue */
@RabbitHandler
@RabbitListener(queues = "LONELY_WOLF_BAK_QUEUE")
public void bakQueueConsumer(Message message){
    System.out.println("Backup queue received message:" + new String(message.getBody()));
}
Copy the code
  • 3. Finally, in the producer classRabbitExchangeControllerAdd a message sending method to send a message:
@GetMapping(value="/send/bak")
public String sendBak(String routingKey,@RequestParam(value = "msg",defaultValue = "no bak message") String msg){
    rabbitTemplate.convertAndSend("LONGLY_WOLF_BAK_ORIGIN_DIRECT_EXCHANGE",routingKey,msg);
    return "succ";
}
Copy the code

After the call, you can see that the backup queue receives the message, indicating that the message goes to the backup queue when it cannot be routed to the queue.

What if an exception occurs after a message is stored in the queue

After ensuring the reliability of the first two phases, when the message finally reaches the queue safely, is it absolutely safe?

When our consumers can’t consume as fast as our producers can produce, messages pile up in queues. By default, messages are not persisted and live in memory, so if a server goes down or something like that happens, the data in the queue will be lost.

The solution here is simply to persist messages, and there are three types of persistence in RabbitMQ: switch persistence, queue persistence and message persistence.

Although say persistence to a certain extent to ensure the reliability of the news, however, when the server’s disk is damaged, still message loss may occur, so in order to more perfect, the RabbitMQ cluster may be necessary, of course, this article does not involves the knowledge of the cluster, the cluster of knowledge and the analysis of the building will be on again.

Switch persistence

Parameter Durable is set to true when declaring switches.

Queue persistence

The durable parameter is set to true when declaring a queue.

Message persistence

Messages can be set to persist when they are sent.

Java API message persistence

In the Java API, message persistence can be set up as follows:

//deliveryMode=2 indicates message persistence
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build();
channel.basicPublish("exchangeName"."routingKey",properties,msg.getBytes());
Copy the code

Spring Boot message persistence

Messages can be set to persist in Spring Boot as follows:

MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);// Message persistence
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("exchangeName"."routingKey",message);
Copy the code

What if the consumer fails to consume the message

After a long journey and three layers of hell mode, consumers finally get the message, but the tragedy happened again. When consumers consume the message, it may be due to their own problems or other accidents that consumers fail to consume the message, and at this time the message is still not processed correctly. Is this the time when you’re looking at the last minute and you can’t do anything?

No, it’s impossible for such a good message queue not to have this scenario in mind. Remember the confirmation mode we mentioned above, in fact, both of the above confirmation modes are server-side confirmation, which is also provided for consumers in RabbitMQ. This is consumer confirmation.

Consumer Confirmation (ACK)

A message will be deleted from the queue only if it is consumed by the consumer, but how does the server know that the message was consumed? This is the need to be confirmed by the consumer before deleting, and we did not see the consumer confirmation process in the introduction of message sending, this is because the consumer will give a reply to the server by default after receiving the message, the server will delete the message after receiving the reply from the consumer.

The Java API implements consumer responses

In the Java API, there are two reply modes: automatic reply and manual reply. When automatic reply, as long as the consumer receives the message, it will confirm to the server, regardless of whether the message is consumed successfully.

  • 1. Create a new consumerAckConsumerClass (omitting the package name and import), here we use the producer’s header tag to determine which response policy to use:
public class AckConsumer {
    private static String QUEUE_NAME = "ACK_QUEUE";
    public static void main(String[] args) throws Exception{
        //1. Declare connections
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://username:password@ip:port");

        //2. Establish a connection
        Connection conn = factory.newConnection();
        Create a message channel
        Channel channel = conn.createChannel();
        //4. Declare queue (default switch AMQP default, Direct)
        channel.queueDeclare(QUEUE_NAME, false.false.false.null);
        System.out.println("Waiting to receive a message...");

        // Create a consumer
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("Received a message:" + new String(body, "UTF-8"));
                Map<String,Object> map = properties.getHeaders();// Get the header message
                String ackType = map.get("ackType").toString();
                if (ackType.equals("ack")) {// Reply manually
                    channel.basicAck(envelope.getDeliveryTag(),true);
                }else if(ackType.equals("reject-single")) {// Reject a single message
                    // Reject the message. The Requeue parameter indicates whether a message is re-enqueued
                    channel.basicReject(envelope.getDeliveryTag(),false);
                    // channel.basicNack(envelope.getDeliveryTag(),false,false);
                }else if (ackType.equals("reject-multiple")) {// Reject multiple messages
                    // Reject the message. The multiple parameter indicates whether to reject in batches, and true indicates that all 
                    channel.basicNack(envelope.getDeliveryTag(),true.false); }}};The second parameter, autoAck, indicates whether automatic answer is enabled
        channel.basicConsume(QUEUE_NAME, false, consumer); }}Copy the code
  • Create a new producerAckProducerClass (omits package name and import) :
public class AckProducter {
    private static String QUEUE_NAME = "ACK_QUEUE";/ / the queue
    private static String EXCHANGE_NAME = "ACK_EXCHANGE";/ / switches
    private static String ROUTEING_KEY = "test";
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("It: / / admin: 123456 @47.107.155.197:5672");
        // Establish a connection
        Connection conn = factory.newConnection();
        // Create a message channel
        Channel channel = conn.createChannel();
        Map<String, Object> headers = new HashMap<String, Object>(1);
        headers.put("ackType"."ack");/ / please reply
// headers.put("ackType", "reject-single"); // Please reject it individually
// headers.put("ackType", "reject-multiple"); // Please reject multiple times

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .contentEncoding("UTF-8")  / / code
                .headers(headers) // Custom attributes
                .messageId(String.valueOf(UUID.randomUUID()))
                .build();

        String msg = "I'm a ack message";
        // Declare a queue
        channel.queueDeclare(QUEUE_NAME, false.false.false.null);
        // Declare a switch
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,false);
        // The queue is bound to the switch
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTEING_KEY);
        // Send a messagechannel.basicPublish(EXCHANGE_NAME, ROUTEING_KEY, properties, msg.getBytes()); channel.close(); conn.close(); }}Copy the code

Spring Boot implements consumer response

In Spring Boot, consumers can confirm the server in three ways:

  • NONE: Automatic ack.

  • “MANUAL” : indicates MANUAL ack. If manual reply is set, and the consumer does not reply to the server, messages will always be queued, which can lead to message accumulation and repeated consumption.

  • AUTO: Indicates an automatic ack when no exception is thrown. In addition, when an exception occurs, it can be divided into the following three situations:

    • 1. When thrownAmqpRejectAndDontRequeueExceptionWhen an exception occurs, the message is rejected and will not be rejoined.
    • 2. When thrownImmediateAcknowledgeAmqpExceptionWhen an exception occurs, the consumer automatically sends a reply to the server.
    • 3. When other exceptions are thrown, the message is rejected and re-enqueued. When this happens and there is only one consumer, it is very easy to create a loop, so it should be avoided at all costs.
  • 1, Spring Boot can control the reply type by parameter:

spring:
  rabbitmq:
    listener:
      type: simple The # direct type came after 2.0
      simple:
        acknowledge-mode: manual
Copy the code
  • 2. In the consumer categoryExchangeConsumer> create a new method to listen to the queue, where the first method is commented out of the existing method, the second method is new, mainly added several parameters, noticeChannelcom.rabbitmq.client.ChannelUnder the package:
/** * listen on the message queue bound to the direct switch */
// @RabbitHandler
// @RabbitListener(queues = "LONGLY_WOLF_ORDER_QUEUE")
// public void directConsumer(String msg){
// system.out. println("direct switch receives message: "+ MSG);
/ /}

/** * listen on the message queue bound to the direct switch and reply manually */
@RabbitHandler
@RabbitListener(queues = "LONGLY_WOLF_ORDER_QUEUE")
public void manualDirectConsumer(String msg, Channel channel,Message message) throws IOException {
    System.out.println("Direct The switch received the message:" + msg + ". This message needs to be answered manually.");
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);// Reply manually
}
Copy the code
  • 3. Or you can passSimpleMessageListenerContainerClass to implement listening, create a new oneRabbitAckConfigClass (omits package name and import) :
@Configuration
public class RabbitAckConfig {
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("LONGLY_WOLF_ORDER_QUEUE");// Set the listener queue name
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);// Confirm manually
        container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {// Message processing
            System.out.println("Received a message:" + new String(message.getBody()) + ". This message needs to be answered manually.");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        });
        returncontainer; }}Copy the code

PS: It is important to note that the two methods should not be used at the same time, otherwise there is no guarantee that the message will be monitored.

Will RabbitMQ provide reliable performance on its own

The two validation modes described above are server validation and consumer validation. Including confirmation of a service is a callback to the producer, so producers can know whether the message has already reached the server and is properly routed to the queue, however, for consumers, producers don’t know, this is because one of the role of the message queue is for producers and consumers of decoupling, in other words, consumers know that the message was sent to the queue, But there is no way to know whether the message is consumed by consumers.

So in order to know whether a message has been successfully consumed, there are two main approaches:

  • 1. After a successful consumption, the consumer needs to call back to the API provided by the producer to inform him that the message has been consumed
  • 2. The server sends a return receipt notification to the producer upon receipt of the consumer’s confirmation

However, if the producer does not receive the information of whether the consumer has consumed successfully or not, it may need compensation. For example, wechat Payment has a compensation mechanism, and the message will be re-sent at a certain interval.

The compensation mechanism also presents a problem. If the consumer succeeds in consuming but fails to tell the producer, then the message will be re-consumed if it is compensated again, so the consumer needs to support idempotent (that is, no matter how many times a message is consumed, it will not change the result). Of course, there are other scenarios to consider, such as dependencies between messages, which need to be dealt with in a specific business scenario.

conclusion

This article mainly describes the RabbitMQ message sending method, introduces the three different switch methods, and finally analyzes the four main steps of sending messages to ensure the reliability of each step, and provides examples through Java API and Spring Boot respectively. A dead letter queue is also a queue, but it stores a special message. RabbitMQ will be better understood in this article.