The github repository is used to share knowledge about Java, including Java basics, MySQL, Spring Boot, MyBatis, Redis, RabbitMQ, computer networks, data structures and algorithms, etc. Welcome to raise PR and star!

Github address: github.com/Tyson0314/J…

If Github is not available, access the Gitee repository.

Gitee address: gitee.com/tysondai/Ja…

Article Contents:

Introduction to the

RabbitMQ is a message queue developed by Erlang. Message queues are used for asynchronous collaboration between applications.

The basic concept

Message: Consists of a Message header and a Message body. The body of the message is opaque, and the header consists of a set of optional properties, including routing-key, Priority, delivery-mode, and so on.

Publisher: Producer of messages.

Exchange: Receives messages and routes them to one or more queues. Default Exchange is the default directly connected switch with an empty string name. Each new queue is automatically bound to the default switch with the same routing key name as the queue name.

Binding: Bind the Exchange and Queue with a Binding so that the Exchange knows which Queue to route the message to.

Queue: Stores messages. The feature of a Queue is first in, first out. A message can be dispatched to one or more queues.

Virtual Host: Each Vhost is essentially a mini RabbitMQ server with its own queue, switch, binding and permission mechanism. Vhost is the basis of the AMQP concept and must be specified at connection time. The default vhost for RabbitMQ is /. When multiple users use the same RabbitMQ server, multiple vhosts can be created and each user can create an exchange and queue at their own Vhost.

Broker: Message queue server entity.

When to use MQ

For some operations that do not need to take effect immediately, they can be split out and executed asynchronously, using message queues.

Take the common order system as an example, after the user clicks the order button, the business logic may include: deducting inventory, generating corresponding documents, and sending SMS notification. MQ can be used in this scenario. SMS notifications are placed in MQ for asynchronous execution, and a message is sent to MQ after the main process of placing an order (such as inventory reduction, generating corresponding receipts) completes, allowing the main process to complete quickly while another thread consumes MQ messages.

The advantages and disadvantages

Disadvantages: Using Erlang implementation, not conducive to secondary development and maintenance; Performance is worse than Kafka. The throughput of a single machine producing and consuming persistent messages and ACK acknowledgements is around 10,000 to 20,000, whereas kafka’s throughput is around 100,000.

Advantages: management interface, easy to use; High reliability; Rich functions, support message persistence, message confirmation mechanism, a variety of message distribution mechanisms.

Exchange type

Exchange distributes messages using different policies. Currently, there are four types: Direct, FANout, Topic, and headers. The Headers mode routes the message according to the HEADERS. In addition, the Headers exchange is exactly the same as the Direct exchange, but the performance is much worse.

Exchange rules.

Type the name Type description
fanout Route all messages sent to the Exchange to all queues bound to it
direct Routing Key==Binding Key
topic Fuzzy matching
headers Exchange does not rely on the matching rules for routing keys and binding keys to route messages. Instead, it matches messages based on the header attribute in the sent message content.

direct

The direct exchange routes the message to a queue where the binding key and routing key match exactly. It is a perfectly matched, unicast pattern.

fanout

All messages destined for a FANout switch are routed to all queues bound to the switch. Fanout type forwarding messages is the fastest.

topic

Topic switches use routing keys and binding keys for fuzzy matching. If a match is successful, the message is sent to the corresponding queue. Both routing keys and binding keys are strings separated by periods (.). Binding keys can contain two special characters “*” and “#” for fuzzy matching, where “*” is used to match one word and “#” is used to match multiple words.

headers

Headers switches route based on the HEADERS attribute in the sent message content. Specify a set of key-value pairs when binding Queue and Exchange. When a message is sent to Exchange, RabbitMQ takes the headers (also a key pair) of the message and compares whether the key pair matches exactly the key pair specified when the Queue was bound to Exchange. The message is routed to the Queue if it is a perfect match, otherwise it is not.

Message loss

Message loss scenarios: Producer-to-RabbitMQ Server messages are lost, RabbitMQ server-stored messages are lost, and RabbitMQ Server-to-consumer messages are lost.

Message loss is addressed in three ways: producer validation, consumer manual validation, and persistence.

Producer confirmation mechanism

The producer sends a message to the queue and cannot ensure that the sent message reaches the server successfully.

Solutions:

  1. Transaction mechanism. After a message is sent, the sender blocks and waits for a RabbitMQ response before sending the next message. Performance is poor.
  2. With producer acknowledgement enabled, RabbitMQ will send an ACK to the producer after a message has been successfully sent to the switch (even if the message is not queued). If the message is not successfully sent to the switch, an NACK message is sent indicating that the message failed.

In Springboot, confirm mode is set by publisher- Confirms parameters:

spring:
    rabbitmq:   
        # Enable confirm
        publisher-confirms: true
Copy the code

A callback method is provided on the production side. When the server acknowledges one or more messages, the producer calls back to this method and performs subsequent processing, such as resending or logging, on the message based on the specific result.

// Whether the message was successfully sent to Exchange
final RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> {
            log.info("correlationData: " + correlationData);
            log.info("ack: " + ack);
            if(! ack) { log.info("Exception Handling...."); }}; rabbitTemplate.setConfirmCallback(confirmCallback);Copy the code

Routing unreachable messages

The producer acknowledgement mechanism only ensures that messages correctly arrive at the switch. Messages that fail to be routed from the switch to the Queue are discarded, resulting in message loss.

There are two ways to handle non-routable messages: the Return message mechanism and the backup switch.

Return message mechanism

The Return message mechanism provides a callback function, ReturnCallback, which is called when a message fails to route from the switch to the Queue. To listen for unreachable messages, mandatory must be set to true.

spring:
    rabbitmq:
        ReturnCallback must be set to Mandatory =true, otherwise Exchange will discard the message if it does not find the Queue, and ReturnCallback will not be triggered
        template.mandatory: true
Copy the code

Listen for unreachable messages via ReturnCallback.

    final RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText, String exchange, String routingKey) ->
            log.info("return exchange: " + exchange + ", routingKey: "
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
rabbitTemplate.setReturnCallback(returnCallback);
Copy the code

When a message fails to route from the switch to the Queue, returns exchange:, routingKey: MAIL, replyCode: 312, replyText: NO_ROUTE.

Backup switch

Alternate exchange is a normal exchange. When you send messages to the corresponding exchange and there is no match in the queue, the messages will be automatically transferred to the corresponding queue of the backup switch so that the messages will not be lost.

Consumer manual message confirmation

It is possible that the consumer receives the message and the MQ service goes down before it can process it, causing the message to be lost. Since the message is automatically ack by default, once the consumer receives the message, MQ Server is notified that the message has been processed and MQ removes the message.

Workaround: The consumer is set to manually confirm the message. After processing the logic, the consumer replies to the broker with an ACK indicating that the message has been successfully consumed and can be deleted from the broker. When a message fails to consume, the message responds to the broker with a nACK, depending on the configuration to re-queue, remove from the broker, or enter a dead-letter queue. This is the acknowledgment that the broker should keep the message as long as it is not received by the consumer, but it will not requeue or assign it to other consumers.

Consumer setup manual ACK:

# set consumer side manually ack spring. The rabbitmq. Listener. Simple. Acknowledge - mode = manualCopy the code

After the message is processed, confirm manually:

    @RabbitListener(queues = RabbitMqConfig.MAIL_QUEUE)
    public void onMessage(Message message, Channel channel) throws IOException {

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        // manual ack; The second parameter is multiple, set to true to indicate that all messages up to the deliveryTag sequence number (including its own) have been received, or false to indicate that a message has been received
        channel.basicAck(deliveryTag, true);
        System.out.println("mail listener receive: " + new String(message.getBody()));
    }
Copy the code

When a message fails to consume, the consumer sends a nACK to the broker. If the consumer sets RequeUE to false, the broker will either delete the message or queue it. Otherwise, the message will re-queue.

persistence

If the RabbitMQ service is restarted due to an exception, messages will be lost. RabbitMQ provides a persistence mechanism to persist messages in memory to hard disk, even if RabbitMQ is restarted, the messages will not be lost.

Message persistence needs to meet the following conditions:

  1. Message Settings persist. Before releasing the message, set the delivery mode to 2, indicating that the message needs to be persistent.
  2. Queue sets persistence.
  3. Switch Settings persist.

When a message is posted to the switch, Rabbit writes the message to the persistence log before sending a response to the producer. Once a message has been consumed from the queue and acknowledged, RabbitMQ removes it from the persistence log. If RabbitMQ restarts before consuming messages, the server will automatically rebuild the switch and queue and load the messages from the persistence log to the corresponding queue or switch to ensure that the messages are not lost.

Mirrored queue

When MQ fails, the service becomes unavailable. RabbitMQ mirrors queues to other nodes in the cluster. If a node in the cluster fails, it can be automatically switched to another node in the mirror to ensure service availability.

Typically, each mirrored queue contains one master and multiple slaves, each corresponding to a different node. All messages sent to the mirrored queue are always sent directly to the master and all slaves. All actions except publish are sent only to the master. The master then broadcasts the results of the command to the slave. The consumption from the mirror queue is actually performed on the master.

Repeat purchases

There are two reasons for message duplication: 1. Message duplication during production and 2. Message duplication during consumption.

The producer sends a message to MQ, and when MQ acknowledges, there is a network fluctuation. When the producer does not receive the acknowledgement, the producer resends the message, causing MQ to receive a duplicate message.

After a successful consumption, there is a network fluctuation when the consumer sends an acknowledgement to MQ. MQ does not receive the acknowledgement, and to ensure that the message is not lost, MQ continues to deliver the previous message to the consumer. The consumer then receives two identical messages. Because duplicate messages are caused by network reasons, they cannot be avoided.

Solution: When sending messages, let each message carry a global unique ID, and determine whether the message has been consumed before consuming the message, so as to ensure the idempotency of message consumption logic. Specific consumption process is as follows:

  1. After obtaining the message, the consumer first queries whether the message exists in Redis/DB according to the ID
  2. If it does not exist, it is normally consumed. After the consumption, redis/db is written
  3. If so, the message is discarded

Traffic limiting on the consumption end

When the RabbitMQ server is overloaded with messages, the messages in the queue flood the consumer server, causing the consumer server to crash. In this case, traffic limiting on the consumer end is required.

Spring RabbitMQ provides the parameter prefetch to set the number of messages to be processed in a single request. If the maximum number of simultaneous messages processed by the consumer is reached, the consumer blocks and does not consume new messages until there are ack messages.

Enable traffic limiting on the consumer end:

# Number of messages processed in a single request, the maximum number of unacks
spring.rabbitmq.listener.simple.prefetch=2
Copy the code

Native RabbitMQ also provides prefetchSize and global. Spring RabbitMQ does not have these two parameters.

// Limit the size of a single message. 0 indicates no limit
//global: limits whether the traffic limiting function is channel-level or consumer-level. When set to false, traffic limiting takes effect at consumer level. When set to true, traffic limiting is removed because the channel level is not yet implemented.
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
Copy the code

Dead-letter queue

The queue in which failed consuming messages are stored.

Reasons for message consumption failure:

  • Message rejected and message not re-enqueued (Requeue =false)
  • The message timed out and was not consumed
  • The maximum queue length is reached

Set the exchange and queue of dead letter queues and bind them:

	@Bean
    public DirectExchange dlxExchange(a) {
        return new DirectExchange(RabbitMqConfig.DLX_EXCHANGE);
    }

    @Bean
    public Queue dlxQueue(a) {
        return new Queue(RabbitMqConfig.DLX_QUEUE, true);
    }

    @Bean
    public Binding bindingDeadExchange(Queue dlxQueue, DirectExchange deadExchange) {
        return BindingBuilder.bind(dlxQueue).to(deadExchange).with(RabbitMqConfig.DLX_QUEUE);
    }
Copy the code

Add two parameters to the normal queue and bind the normal queue to the dead letter queue. When message consumption fails, the message is routed to a dead letter queue.

    @Bean
    public Queue sendSmsQueue(a) {
        Map<String,Object> arguments = new HashMap<>(2);
        // Bind the queue to the private switch
        arguments.put("x-dead-letter-exchange", RabbitMqConfig.DLX_EXCHANGE);
        arguments.put("x-dead-letter-routing-key", RabbitMqConfig.DLX_QUEUE);
        return new Queue(RabbitMqConfig.MAIL_QUEUE, true.false.false, arguments);
    }
Copy the code

Producer complete code:

@Component
@Slf4j
public class MQProducer {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    RandomUtil randomUtil;

    @Autowired
    UserService userService;

    final RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> {
            log.info("correlationData: " + correlationData);
            log.info("ack: " + ack);
            if(! ack) { log.info("Exception Handling...."); }};final RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText, String exchange, String routingKey) ->
            log.info("return exchange: " + exchange + ", routingKey: "
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);

    public void sendMail(String mail) {
        // It looks like the thread is unsafe in the range 100,000-999999
        Integer random = randomUtil.nextInt(100000.999999);
        Map<String, String> map = new HashMap<>(2);
        String code = random.toString();
        map.put("mail", mail);
        map.put("code", code);

        MessageProperties mp = new MessageProperties();
        In a production environment, instead of Message, a tool like fastJson is used to convert objects to JSON format and send them
        Message msg = new Message("tyson".getBytes(), mp);
        msg.getMessageProperties().setExpiration("3000");
        // If the consumer is set to ACK manually, the production end must send correlationData when sending a message. The message must be globally unique to uniquely identify the message.
        CorrelationData correlationData = new CorrelationData("1234567890"+new Date());

        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        rabbitTemplate.convertAndSend(RabbitMqConfig.MAIL_QUEUE, msg, correlationData);

        / / in the redisuserService.updateMailSendState(mail, code, MailConfig.MAIL_STATE_WAIT); }}Copy the code

Consumer complete code:

@Slf4j
@Component
public class DeadListener {

    @RabbitListener(queues = RabbitMqConfig.DLX_QUEUE)
    public void onMessage(Message message, Channel channel) throws IOException {

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        / / manual ack
        channel.basicAck(deliveryTag,false);
        System.out.println("receive--1: " + newString(message.getBody())); }}Copy the code

When there is a dead letter in the normal queue, RabbitMQ will automatically re-publish the message to the configured dead letter switch and route it to the dead letter queue. You can listen for messages in a dead letter queue and process them accordingly.

other

The pull model

The pull mode uses the channel.basicGet method to retrieve messages, as shown in the following code:

GetResponse response = channel.basicGet(QUEUE_NAME, false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
Copy the code

Message expiration time

You can set an expiration time for a message in milliseconds (ms) when it is sent on the production side.

Message msg = new Message("tyson".getBytes(), mp);
msg.getMessageProperties().setExpiration("3000");
Copy the code

You can also specify the TTL of the queue when creating the queue, which is calculated from the time the message enters the queue. Messages that exceed this time will be removed.

Refer to the link

The RabbitMQ basis

The RabbitMQ Springboot integration

RabbitMQ message persistence

RabbitMQ sends mail codes

The online rabbitMQ problem

Finally, we share a Github repository, which contains more than 200 classic computer books, including C language, C++, Java, Python, front-end, database, operating system, computer network, data structure and algorithm, machine learning, programming life, etc. The warehouse is being updated

Github address: github.com/Tyson0314/j…

If Github is not available, access the Gitee repository.

Gitee address: gitee.com/tysondai/ja…