Message Queue (MQ) is now used by many companies, and there are many MQ frameworks. The popular ones are RabbitMq, ActiveMq, ZeroMq, Kafka, and Alibaba’s RocketMQ. So this article will introduce RabbitMq from the following points:

Install RabbitMq 2. Use RabbitMq in SpringBoot project (five modes) 3. Delay queue 4.

Original is not easy, finally, I hope you see the officer dad move a thumb-up!

Install RabbitMq on Mac (Windows installation point here)


Install Homebrew

1, Enter Homebrew official website, copy command:

2. Open the middle end of the computer, paste the command into the middle end and press Enter to execute the command, and wait for the installation to complete:

Matters needing attention

1, if there is a paste command carriage return after the curl: port 443 (7) Failed to connect to raw.githubusercontent.com: Connection refused please try again. If this does not work please click here.

Install the RabbitMq

1, open the terminal, enter the following command: brew install rabbitmq, press Enter to wait for the installation, if the following screen is displayed, it will be successful:

Start the RabbitMq

1. After the installation is completed, input the following content in the middle terminal to start it, and the step shown in Figure 4 will be regarded as successful startup (do not close the middle terminal after successful startup, otherwise you will not be able to see it when you log in the browser) :

1, CD/usr/local/Cellar/rabbitmq /2, CD3.8.3 (The version number depends on your own)3Sbin/the rabbitmq server. -Copy the code

2. Log in to http://localhost:15672 to view the initialization. The default password is guest/guest

Add users and authorization

Add user

(1) Super administrator

Can log in the management console, can view all the information, and can operate the user, policy.

(2) Monitoring

Log on to the management console and view rabbitMQ node information (number of processes, memory usage, disk usage, etc.)

(3) Policymaker

You can log in to the management console and manage policies. However, you cannot view the node information (marked by the red box in the figure above).

(4) General management

You can only log in to the management console. You cannot see node information or manage policies.

authorization

RabbitMq’s default virtual host is /. If you want to customize your virtual host, click here.

Use of RabbitMq in SpringBoot projects


Configuration items

Add the dependent

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>  <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>Copy the code

Modifying a Configuration File

Spring: rabbitmq: host: specifies the rabbitpath. Port: specifies the rabbitport. Virtual-host: / username: specifies the rabbitaccount. Password of their own Rabbit publisher- Confirms:true
Copy the code

Create a new rabbit package under the project, the content can be directly copied and pasted over

1. Add an entity class to obtain configuration file information

@Data
@ToString
@Configuration
public class RabbitProperties {

    /** * RabbitMQ server address */
    @Value("${spring.rabbitmq.host}")
    private String host;

    /** * RabbitMQ server port */
    @Value("${spring.rabbitmq.port}")
    private int port;

    /** * rabbitmq account */
    @Value("${spring.rabbitmq.username}")
    private String username;

    /** * rabbitmq password */
    @Value("${spring.rabbitmq.password}")
    private String password;
}
Copy the code

2. Add a configuration class

@Configuration
public class RabbitConfiguration {

    @Autowired
    private RabbitProperties rabbitProperties;

    @Bean
    public ConnectionFactory connectionFactory(a) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitProperties.getHost(), rabbitProperties.getPort());
        connectionFactory.setUsername(rabbitProperties.getUsername());
        connectionFactory.setPassword(rabbitProperties.getPassword());
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    / * * *@return
     * @Scope(value = ConfigurableBeanFactory. SCOPE_PROTOTYPE) every time the mean in injection to automatically create a new bean instance *@Scope(value = ConfigurableBeanFactory. SCOPE_SINGLETON) singleton, only to create an instance of * in the entire application@Scope(value = WebApplicationContext. SCOPE_GLOBAL_SESSION) general not commonly used in the global session *@Scope(value = WebApplicationContext. SCOPE_APPLICATION) in a web application just create an instance *@Scope(value = WebApplicationContext. SCOPE_REQUEST) in a request to create an instance@ScopeEvery time (value = WebApplicationContext. SCOPE_SESSION) to create a session to create an instance. * proxyMode = ScopedProxyMode INTERFACES to create a JDK proxy mode * ProxyMode = scopedproxymode. TARGET_CLASS class-based proxyMode * proxyMode= scopedproxymode. NO (default) NO proxy */
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
    public RabbitTemplate rabbitTemplate(a) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        // Message sending failure returns to queue, yML needs to configure publisher-returns: true
        // template.setMandatory(true);
        returntemplate; }}Copy the code

3. Define names of queues and switches (order-related queues and switches have been added here)

public class RabbitMqKey {

    /** * order - queue */
    public static final String TRADE_ORDER_QUEUE = "trade-order-queue";

    /** * order - switch */
    public static final String TRADE_ORDER_EXCHANGE = "trade-order-exchange";

}
Copy the code

Initialize queues, switches, etc., and bind relationships

@Component
public class TradeOrderQueueConfig {


    private final static Logger logger = LoggerFactory.getLogger(TradeOrderQueueConfig.class);

    /** * Create Queue * Queue can have 4 parameters * String name: Queue name * Boolean durable: To persist message queues, rabbitMQ restarts without creating a new queue. Default: true * Boolean EXCLUSIVE: Indicates whether this message queue is valid only for the current connection. Default: false * Boolean autoDelete: Indicates that message queues are automatically deleted when not in use. Default is false * Map<String, Object> arguments: * *@return* /
    @Bean(name = "queue")
    public Queue queue(a) {
        logger.info("queue : {}", RabbitMqKey.TRADE_ORDER_QUEUE);
        // Queue persistence
        return new Queue(RabbitMqKey.TRADE_ORDER_QUEUE, true);
    }

    /** * Create a Fanout Exchange * <p> * In rabbitMQ, there are 4 types of Exchange: Direct, Topic, Fanout, Headers * Direct Exchange: The Routing key in the message is compared with all the Routing keys in the Binding associated with the Exchange. If they are equal, the message is sent to the Queue corresponding to the Binding. Topic Exchange: Compare the Routing key in the message with all the Routing keys in the Binding for the Exchange. If a match is found, the message will be sent to the Queue corresponding to the Binding. * Fanout Exchange: forwards messages directly to all binding queues, ignoring Routing keys. * Headers Exchange: Matches the Headers in a message with all Binging parameters associated with the Exchange. If a match is found, the message is sent to the Binding Queue. * *@return* /
    @Bean(name = "fanoutExchange")
    public FanoutExchange fanoutExchange(a) {
        logger.info("exchange : {}", RabbitMqKey.TRADE_ORDER_EXCHANGE);
        return new FanoutExchange(RabbitMqKey.TRADE_ORDER_EXCHANGE);
    }

    /** * Bind queues to Exchange topics using a routingKey **@return* /
    @Bean
    Binding fanoutBinding(@Qualifier("queue") Queue queue,
                    @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        returnBindingBuilder.bind(queue).to(fanoutExchange); }}Copy the code

5. Send message classes

@Component
public class Sender {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    / * * * if the scope attribute is set to ConfigurableBeanFactory rabbitTemplate SCOPE_PROTOTYPE, so can't automatic injection * need manual injection * /
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /** * Order information (sent to the switch) **@param payload
     * @return* /
    public String orderSendExchange(Object payload){
        return baseSend(RabbitMqKey.TRADE_ORDER_EXCHANGE, "", payload, null.null);
    }

    /** * Order information (sent to queue) **@param payload
     * @return* /
    public String orderSendQueue(Object payload){
        return baseSend("", RabbitMqKey.TRADE_ORDER_QUEUE, payload, null.null);
    }

    /** * MQ sends data base method **@paramExchange Indicates the name of the switch@paramRoutingKey column name *@paramPayload Indicates the payload@paramUniqueMessageId Identifies an ID, which can be automatically generated if not transmitted@paramMessageExpirationTime Duration *@returnMessage number */
    public String baseSend(String exchange, String routingKey, Object payload, String uniqueMessageId, Long messageExpirationTime) {
        // Generate the message ID
        String finalUniqueMessageId = uniqueMessageId;
        if (StringUtils.isBlank(uniqueMessageId)) {
            uniqueMessageId = UUID.randomUUID().toString();
        }
        logger.info("SEND --- unique message id:{}", uniqueMessageId);

        // Message properties
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // Write the message number to the message property
                message.getMessageProperties().setMessageId(finalUniqueMessageId);
                // Message persistence time
                if(! StringUtils.isEmpty(String.valueOf(messageExpirationTime))) { logger.info("Set message persistence: {}", messageExpirationTime);
                    message.getMessageProperties().setExpiration(Long.toString(messageExpirationTime));
                }
                // Set the persistence mode
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                returnmessage; }}; logger.info(SEND -- messagePostProcessor: {}", messagePostProcessor);

        / / message
        Message message = null;
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            String json = objectMapper.writeValueAsString(payload);
            logger.info("Send message: {}", payload.toString());
            // Convert the data format
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentEncoding(MessageProperties.CONTENT_TYPE_JSON);
            message = new Message(json.getBytes(), messageProperties);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        // correlationData
        CorrelationData correlationData = new CorrelationData(uniqueMessageId);

        /** * convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData CorrelationData) * exchange: routing * routingKey: binding key * message: sending a message * messagePostProcessor: CorrelationData: The object has only one id attribute, which indicates that the current message is unique */
        rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);

        returnfinalUniqueMessageId; }}Copy the code

6. Confirm the information

@Component
public class RabbitAck implements RabbitTemplate.ConfirmCallback {

    private final static Logger logger = LoggerFactory.getLogger(RabbitAck.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(a) {
        / / specified ConfirmCallback
        // If rabbitTemplate is a singleton, the callback is set last
        rabbitTemplate.setConfirmCallback(this);
    }

    / * * *@param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        logger.info("ACK --- MQ message id: {}" + correlationData);
        if (ack) {
            logger.info("ACK -- Message sent confirmation success!);
        } else {
            logger.info("ACK --- MQ message id: {}", correlationData.getId());
            logger.info("ACK --- MQ confirmetion: {}", ack);
            logger.info("ACK --- Message sending confirmation failed, reason for failure:"+ cause); }}}Copy the code

practice

1. Simple queues

P: message producer (the sender of the letter) C: message consumer (the receiver of the letter) Red grid: queue (the postman, the messenger)

A message sent to a queue will only be retrieved and consumed by one consumer. For multiple consumers to receive and consume at the same time, see below!

1. Send a message

(1) Add the message sending interface

@RestController
public class ProducersController {

    @Resource
    private Sender sender;

    @PostMapping("/producers")
    public void producers(a){
        sender.orderSendQueue("Hello World"); }}Copy the code

(2) PostMan call interface

(3) View the project console

(4) Check rabbitMQ

2. Receive messages

(1) Add a listener for receiving order queue information

@Component
public class OrderQueueListener {

    private static final Logger logger = LoggerFactory.getLogger(OrderQueueListener.class);

    /** * Receive message **@param message
     */
    @RabbitListener(queues = RabbitMqKey.TRADE_ORDER_QUEUE)
    public void process(Message message) {
        try {
            String msg = new String(message.getBody());
            if (StringUtils.isBlank(msg)) {
                logger.warn("Received data is empty");
                return;
            }
            System.out.println(msg);
        } catch (Exception e) {
            logger.warn("Processing received data, exception: {}", e.getMessage()); e.printStackTrace(); }}}Copy the code

(2) Since we have sent the message just now, we can receive the message as long as we start the project

2. Work mode

P: message producer (sender) C1, C2: message consumer (receiver) Red grid: queue (postman, messenger)

(1) Send messages, add an interface to the controller just now, and send 100 messages in a loop

@PostMapping("/batch/producers")
public void batchProducers(a){
    for (int i = 0; i < 100; i++){
        sender.orderSendQueue("Hello World"+ i); }}Copy the code

(2) Open two services to receive information from the same queue

@RabbitListener(queues = RabbitMqKey.TRADE_ORDER_QUEUE)
    public void process(Message message) {
        try {
            String msg = new String(message.getBody());
            if (StringUtils.isBlank(msg)) {
                logger.warn("Received data is empty");
                return;
            }
            System.out.println("Data received by service 1:" + msg);
        } catch (Exception e) {
            logger.warn("Processing received data, exception: {}", e.getMessage()); e.printStackTrace(); }}Copy the code
@RabbitListener(queues = RabbitMqKey.TRADE_ORDER_QUEUE)
    public void process(Message message) {
        try {
            String msg = new String(message.getBody());
            if (StringUtils.isBlank(msg)) {
                logger.warn("Received data is empty");
                return;
            }
            System.out.println("Data received by service 2:" + msg);
        } catch (Exception e) {
            logger.warn("Processing received data, exception: {}", e.getMessage()); e.printStackTrace(); }}Copy the code

(3) View the results of the two services

Result: Multiple services receive messages from the same queue, and MQ sends messages in polling mode by default! If you want to use the fair share model, take a look below

(4) Modify the service 1 configuration file

(5) Modify the service 2 configuration file

(6) Re-test and check the results (the following figure shows that the messages received by service 2 are significantly more than those received by service 1)

3. Subscription

P: message producer (the sender of the letter) X: exchange (the post office) C1, C2: message consumer (the receiver of the letter) Red grid: queue (the postman, the messenger)

When a letter is sent to two people through the post office, the post office will send two postmen to deliver the letter to the corresponding person. In this case, two different queues are bound to the same exchange, ensuring that a message can be received by both services at the same time.

Rabbitmq has four types of Exchange: Direct, Topic, Fanout, Headers Direct Exchange: The Routing key in the message is compared with all the Routing keys in the Binding associated with the Exchange. If they are equal, the message is sent to the Queue corresponding to the Binding. Topic Exchange: Compare the Routing key in the message with all the Routing keys in the Binding associated with the Exchange. If a match is found, the message will be sent to the Queue corresponding to the Binding. Fanout Exchange: directly forwards messages to all binding queues, ignoring Routing keys. Headers Exchange: Matches the Headers in a message with all Binging parameters associated with the Exchange. If a match is found, the message is sent to the Binding Queue.

Routing keys must be specified for Direct Exchange and Topic Exchange Binding

For Fanout Exchange and Headers Exchange Binding, you do not need to specify the Routing key

1. Queue binding switch

As you can see from earlier, I have bound the order queue to the Exchange (the order Exchange is of type Fanout Exchange)

Now bind the other service as well

2. Check the binding relationship

3. Start both services and send messages to the order exchange

(1) Added the send to switch interface

(2) Call the interface and perform send

(3) View the two service results

If the exchange is not bound to a queue, messages sent to the exchange will be lost because the switch does not have the ability to store messages, which can only exist in queues.

4. Routing mode

Routing patterns with publish-subscribe pattern, then add the type on the basis of a subscription model, a subscription model is distributed to all the queue bound to the switch, routing patterns only distributed to binding routing key queue specified in the above the switch, is according to certain rules specified sent to the content of the exchanger which queue to receive, Not all queues bound to the exchange will receive it.

P: message producer (sender) x:direct type exchange (post office) C1, C2: message consumer (receiver) Red grid: queue (postman, messenger)

In routing mode, messages are routed to queues whose binding keys match routing keys exactly. This mode is also known as Exchange direct mode.

For example, if we send a message to Exchange with routingKey=”error”, the message will be routed to Queue1 (amqp.gen -s9b… This is the Queue name automatically generated by RabbitMQ) and Queue2 (amqp.gen-Agl…). . If we send a message with routingKey=”info” or routingKey=”warning”, the message will only be routed to Queue2. If we send messages with other Routingkeys, the messages will not be routed to either Queue.

1. Create queues, switches, and routes and bind them

(1) Service 1:

public class RabbitMqKey {

    /** * order - queue */
    public static final String TRADE_ORDER_QUEUE = "trade-order-queue";

    /** * order - switch */
    public static final String TRADE_ORDER_EXCHANGE = "trade-order-exchange";

    /**
     * 路由测试队列
     */
    public static final String TRADE_DIRECT_TEST_QUEUE = "trade-direct-test-queue";

    /**
     * 路由测试交换器
     */
    public static final String TRADE_DIRECT_TEST_EXCHANGE = "trade-order-exchange";

    /** * route */
    public static final String ROUTING_KEY = "ERROR";

}
Copy the code

Binding relationships in the TradeOrderQueueConfig class:

(2) Service 2

/**
     * 路由测试队列
     */
    public static final String TRADE_DIRECT_TEST_QUEUE_V2 = "trade-direct-test-queue-v2";

    /**
     * 路由测试交换器
     */
    public static final String TRADE_DIRECT_TEST_EXCHANGE = "trade-order-exchange";

    /** * route */
    public static final String ROUTING_KEY = "INFO";
Copy the code

2. Check the binding relationship

3. Create a DirectListener class for the two services to receive queue information

(1) Service 1:

@Component
public class DirectListener {

    private static final Logger logger = LoggerFactory.getLogger(DirectListener.class);

    /** * Receive message **@param message
     */
    @RabbitListener(queues = RabbitMqKey.TRADE_DIRECT_TEST_QUEUE)
    public void process(Message message) {
        try {
            String msg = new String(message.getBody());
            if (StringUtils.isEmpty(msg)) {
                logger.warn("Received data is empty");
                return;
            }
            System.out.println("Data received by service 1:" + msg);

        } catch (Exception e) {
            logger.warn("Processing received data, exception: {}", e.getMessage()); e.printStackTrace(); }}}Copy the code

(2) Service 2

@Component
public class DirectListener {

    private static final Logger logger = LoggerFactory.getLogger(DirectListener.class);

    /** * Receive message **@param message
     */
    @RabbitListener(queues = RabbitMqKey.TRADE_DIRECT_TEST_QUEUE_V2)
    public void process(Message message) {
        try {
            String msg = new String(message.getBody());
            if (StringUtils.isEmpty(msg)) {
                logger.warn("Received data is empty");
                return;
            }
            System.out.println("Data received by service 2:" + msg);

        } catch (Exception e) {
            logger.warn("Processing received data, exception: {}", e.getMessage()); e.printStackTrace(); }}}Copy the code

4. Add Sender method to specify route to send message and add interface to send message

/** * ERROR routes send messages to the switch **@param payload
     * @return* /
    public String errorSendQueue(Object payload){
        return baseSend(RabbitMqKey.TRADE_DIRECT_TEST_EXCHANGE, RabbitMqKey.ROUTING_KEY, payload, null.null);
    }

    /** * INFO Route sends messages to the switch **@param payload
     * @return* /
    public String infoSendQueue(Object payload){
        return baseSend(RabbitMqKey.TRADE_DIRECT_TEST_EXCHANGE, "INFO", payload, null.null);
    }
Copy the code

Add two interfaces to the ProducersController class

/** * info */
    @PostMapping("/send/info")
    public void sendInfo(a){
        sender.infoSendQueue("I'm an INFO log, you can leave me alone.");
    }

    /** * info */
    @PostMapping("/send/error")
    public void sendError(a){
        sender.errorSendQueue("I'm an error log, and you don't have to worry about me, as long as you're not afraid to die.");
    }
Copy the code

Then restart both services

(5) Call the /send/info interface to view the result

Service 1 did not receive a message:

Service 2 received the message:

(6) Call the /send/error interface to check the result

Service 1:

Service 2:

5. Theme mode (Wildcard mode)

P: message producer (sender) X: Topic-type exchange (post office) C1, C2: message consumer (receiver) Red grid: queue (postman, messenger)

1. Create queues, switches, and bindings

(1) Service 1:

/**
     * 路由测试队列
     */
    public static final String TRADE_TOPIC_TEST_QUEUE = "trade-topic-test-queue";

    /**
     * 路由测试交换器
     */
    public static final String TRADE_TOPIC_TEST_EXCHANGE = "trade-topic-test-exchange";

    /** * route */
    public static final String TOPIC_ROUTING_KEY = "JAVA.#";
Copy the code

(2) Service 2

/**
     * 路由测试队列
     */
    public static final String TRADE_TOPIC_TEST_QUEUE_V2 = "trade-topic-test-queue-v2";

    /**
     * 路由测试交换器
     */
    public static final String TRADE_TOPIC_TEST_EXCHANGE = "trade-topic-test-exchange";

    /** * route */
    public static final String TOPIC_ROUTING_KEY = "JAVA.*";
Copy the code

2. Check the binding relationship

3. Create a TopicListener class for the two services to receive queue information

(1) Service 1:

@Component
public class TopicListener {

    private static final Logger logger = LoggerFactory.getLogger(TopicListener.class);

    /** * Receive message **@param message
     */
    @RabbitListener(queues = RabbitMqKey.TRADE_TOPIC_TEST_QUEUE)
    public void process(Message message) {
        try {
            String msg = new String(message.getBody());
            if (StringUtils.isEmpty(msg)) {
                logger.warn("Received data is empty");
                return;
            }
            System.out.println("Data received by service 1:" + msg);

        } catch (Exception e) {
            logger.warn("Processing received data, exception: {}", e.getMessage()); e.printStackTrace(); }}}Copy the code

(2) Service 2

@Component
public class TopicListener {

    private static final Logger logger = LoggerFactory.getLogger(TopicListener.class);

    /** * Receive message **@param message
     */
    @RabbitListener(queues = RabbitMqKey.TRADE_TOPIC_TEST_QUEUE_V2)
    public void process(Message message) {
        try {
            String msg = new String(message.getBody());
            if (StringUtils.isEmpty(msg)) {
                logger.warn("Received data is empty");
                return;
            }
            System.out.println("Data received by service 2:" + msg);

        } catch (Exception e) {
            logger.warn("Processing received data, exception: {}", e.getMessage()); e.printStackTrace(); }}}Copy the code

4. Add Sender method to specify route to send message and add interface to send message

/** * sends messages to the topic type switch **@param payload
     * @return* /
    public String topicErrorSendQueue(Object payload){
        return baseSend(RabbitMqKey.TRADE_TOPIC_TEST_EXCHANGE, "JAVA.LOG", payload, null.null);
    }

    /** * sends messages to the topic type switch **@param payload
     * @return* /
    public String topicInfoSendQueue(Object payload){
        return baseSend(RabbitMqKey.TRADE_TOPIC_TEST_EXCHANGE, "JAVA.LOG.ERROR", payload, null.null);
    }
Copy the code

Add two interfaces to the ProducersController class

/**
     * 发送信息
     */
    @PostMapping("/send/java")
    public void sendJava(a){
        sender.topicErrorSendQueue("JAVA.*: match no more than one word.);
    }

    /**
     * 发送信息
     */
    @PostMapping("/send/java/error")
    public void sendJavaError(a){
        sender.topicInfoSendQueue("JAVA.*: match no more than one word.);
    }
Copy the code

5, call /send/ Java interface, check the result (route key is java.log, so both services can match)

(1) Service 1:

(2) Service 2

6, call /send/ Java /error and check the result (the route key is java.log. error).

(1) Service 1:

(2) Service 2

*.* is used for routing keys in wildcard mode. Note that the middle is a dot.

Delays in the queue


What is a delay queue

Delayed queue, first of all, it is a queue, queue means that the internal elements are ordered, elements out of the queue and in the queue is directional, elements in from one end, out from the other end.

Second, delay queue, the time delay of the most important features is reflected in its attributes, is not the same as ordinary queue, the common element in the queue waiting to want to be always take early treatment, and delay the elements in the queue is to be removed in the specified time and processing, so delay the elements in the queue is properties with time, Usually a message or task that needs to be processed.

Simply put, a delay queue is a queue that holds elements that need to be processed at a specified time.

Application scenarios of delay queues

(1) The order will be automatically cancelled if it is not paid within ten minutes. (2) If the newly created store has not uploaded the goods within ten days, it will automatically send a message to remind. (3) If the bill is not paid within one week, it will be settled automatically. (4) After successful registration, if the user does not log in within three days, it will be reminded by SMS. (5) The user initiates a refund and notifies the relevant operation personnel if the refund is not processed within three days. (6) After the scheduled meeting, all participants shall be informed to attend the meeting ten minutes in advance of the scheduled time.

implementation

RabbitMQ has no delay queue and no properties to set, only a combination of a dead letter exchange (DLX) and an expiration time (TTL)

TTL TTL is short for Time To Live. RabbitMq supports TTL Settings for messages and queues, which are specified at the time the message is sent, and for queues, which are counted from the time the message enters the queue and cleared automatically if the queue timeout is exceeded. If both methods use the smaller value between the TTL of the message and the TTL of the queue, that is, the message 5s expires and the queue 10s, the 5s takes effect. Default is no expiration time, indicating that the message has no expiration time. If set to 0, messages are consumed when delivered to consumers; otherwise, messages are discarded. The expiration time of the message is set using the x-message-TTL parameter, in milliseconds. Set the expiration time of the queue using the X-Expires parameter in milliseconds. Note that this cannot be set to 0.

A DLX, or dead-letter Exchange, is a normal switch that can be bound to any queue. A dead letter queue refers to a queued (normal) message (expired) become dead letter, can be sent to another switch (DLX), and then routed to a queue, this queue, namely, dead letter queue become dead letter in the following situations: The message is basic.reject or basic.nack with requeue=false TTL of the message has expired The queue length limit is exceeded (the queue is full)

Note 1: If there are dead letters on the queue, RabbitMq will send them to the specified DLX. Declare the DLX by setting the x-dead-letter-exchange parameter in the queue. If the current DLX is of direct type, declare the x-dead-letter-routing-key parameter to specify the routing key. If no routing key is specified, use the routing key of the original queue

Through the DLX simulate the function of the queue delay and TTL, namely: a switch binding a1 queue, send a message to a switch, a message will be stored in the a1 queue, while a1 news will set the expiration time, within the queue to the expiration date has not been spending, such as the message will be sent to the dead-letter switch b, and b and dead-letter queue b1 binding, We just need to consume the messages in B1.

1. Define the switch and queue names

/** * Queue to receive delayed messages */
    public static final String TRADE_ORDER_DELAY_QUEUE = "trade-order-delay-queue";
    /** * DLX, the exchange to which the dead letter is sent */
    public static final String TRADE_ORDER_DELAY_EXCHANGE = "trade-order-delay-exchange";
    /** * Routing key name * the message is sent in the routingKey */
    public static final String ORDER_DELAY_ROUTING_KEY = "order-delay";

    /** * queue-queue */
    public static final String DEAD_LETTER_QUEUE = "dead-letter-queue";

    /** * Exchange - exchange */
    public static final String DEAD_LETTER_EXCHANGE = "dead-letter-exchange";

    /** * Routing key name */
    public static final String DEAD_LETTER_ROUTING_KEY = "dead-letter";
Copy the code

2. The TradeOrderQueueConfig class initializes the exchange and queue and binds them

/** * The queue that receives the delay message and specifies the expiration time, which dead letter switch to send to after expiration, and the route to the dead letter switch **@return* /
    @Bean(name = "delayOrderQueue")
    public Queue delayOrderQueue(a) {
        Map<String, Object> params = new HashMap<>(2);
        // x-dead-letter-exchange declares the dead-letter exchange to which the current queue is bound
        params.put("x-dead-letter-exchange", RabbitMqKey.DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key specifies the dead-letter name to be carried when forwarding.
        params.put("x-dead-letter-routing-key", RabbitMqKey.DEAD_LETTER_ROUTING_KEY);
        // x-message-TTL Specifies the queue expiration time
        params.put("x-message-ttl".100000);
        return QueueBuilder.durable(RabbitMqKey.TRADE_ORDER_DELAY_QUEUE).withArguments(params).build();
    }

    /** * The switch that receives the delay message **@return* /
    @Bean(name = "orderDelayExchange")
    public DirectExchange orderDelayExchange(a) {
        return new DirectExchange(RabbitMqKey.TRADE_ORDER_DELAY_EXCHANGE);
    }

    @Bean
    Binding orderDelayBinding(@Qualifier("delayOrderQueue") Queue delayOrderQueue,
                         @Qualifier("orderDelayExchange") DirectExchange orderDelayExchange) {
        return BindingBuilder.bind(delayOrderQueue).to(orderDelayExchange).with(RabbitMqKey.ORDER_DELAY_ROUTING_KEY);
    }

    /** * Receive the message in the dead letter queue - queue *@return* /
    @Bean(name = "orderQueue")
    public Queue orderQueue(a) {
        return new Queue(RabbitMqKey.DEAD_LETTER_QUEUE, true);
    }

    /** * Matches the routing key to a pattern. At this point the queue needs to be bound to a pattern. The * symbol "#" matches one or more words, and the * symbol matches no more than one word. So "audit.#" matches "audit.irs.corporate", but "audit.*" only matches "audit.irs". * * /
    @Bean(name = "orderTopicExchange")
    public TopicExchange orderTopicExchange(a) {
        return new TopicExchange(RabbitMqKey.DEAD_LETTER_EXCHANGE);
    }

    @Bean
    Binding orderTopicBinding(@Qualifier("orderQueue") Queue orderQueue,
                              @Qualifier("orderTopicExchange") TopicExchange orderTopicExchange) {
        return BindingBuilder.bind(orderQueue).to(orderTopicExchange).with(RabbitMqKey.DEAD_LETTER_ROUTING_KEY);
    }
Copy the code

3. View binding information

Add the DelayListener class to receive delayed messages

@Component
public class DelayListener {

    private static final Logger logger = LoggerFactory.getLogger(DelayListener.class);

    /** * Receive delayed messages **@param message
     */
    @RabbitListener(queues = RabbitMqKey.DEAD_LETTER_QUEUE)
    public void process(Message message) {
        try {
            String msg = new String(message.getBody());
            if (StringUtils.isEmpty(msg)) {
                logger.warn("Received data is empty");
                return;
            }
            logger.info("Delayed message received: {}", msg);
        } catch (Exception e) {
            logger.warn("Processing received data, exception: {}", e.getMessage()); e.printStackTrace(); }}}Copy the code

5. Add methods and interfaces for sending delayed messages

Sender class add method to send:

/** * Send delay queue information **@param payload
     * @return* /
    public String delaySend(Object payload){
        return baseSend(RabbitMqKey.TRADE_ORDER_DELAY_EXCHANGE, RabbitMqKey.ORDER_DELAY_ROUTING_KEY, payload, null.null);
    }
Copy the code

The ProducersController class adds an interface

/**
     * 发送信息
     */
    @PostMapping("/send/delay/message")
    public void sendDelayMessage(a){
        sender.delaySend("Order x has expired, please return stock");
    }
Copy the code

6. Call the interface and view the result

Note: The expiration time is set to 100 seconds, which means that all messages you send to this queue are out of date in 100 seconds. If some of your services are out of date in 10 seconds and some are out of date in 1 minute, you can’t do this. Use a RabbitMQ plugin here.

Problems and solutions in using RabbitMq


Click to view the Spring Boot + RabbitMQ configuration parameters

Message retry mechanism

If there is a news consumers failed to perform the business logic, you need to send the mq again consumption, can use the rabbitmq retry mechanism, and retry generally need to specify a retry count and retry time interval, or at the back of the news will not available, set the repetitions in general have the following kinds:

1. Use a third-party storage such as Redis or Mongo to store the current retry count. 2. Add the number of retries to the header and increase the number of retries by 1 after resending the message using channel.basicPublish(). 3. Use the retry feature delivered with Spring-Rabbit.

If you set the message is manual confirmation, then the third configuration is invalid, the channel, basicNack (message. GetMessageProperties () getDeliveryTag (), false, false); This method is confirmed failure, after the failure of the third parameter is whether you want to put back the message to the queue, if you configured the true, put back to the message queue, the message will continue to try again, but not according to the configuration file retries, it will block the back of the news, so the manual confirmation with the first or second way, Set the number of repetitions by yourself, forward the information to another queue for logging and so on after reaching the number of repetitions, and then manually confirm success.

The first:

1. Test the above:

The parameters such as the maximum retry times, maximum retry time, and retry interval are invalid.

2, if you must manually confirm, you can see the following picture: The exchange is bound to a dead letter queue, after the failure to confirm data lost in the dead letter queue, and then log, manual intervention:

If you want to confirm manually and specify the number of retries, the number of retries will be placed in the cache (blocking subsequent messages, meaning that the next message will not be sent until the 5 retries are completed) :

(1) Change the configuration file to manual confirmation

(2) The methods in the receiving class are changed to manual validation. The cached key: correlation is the unique ID that we add to the message when we send it

correlation:

(3) Check the results, more than 5 times will enter the dead letter queue (then you can take the data in the dead letter queue to record logs or prompt manual intervention, etc.)

4. If you do not want to block messages and want to push failed messages back to the queue, and follow the first-in, first-out principle, please refer to the second method

The second:

Please click here, the general idea is (put the number of retries in the RabbitMQ header file and the next time you execute it you get the information from the header file and decide if the maximum number of retries has been exceeded and do something about it) whether the code will work or not I haven’t tried.

The third:

1. Add switch and queue names

    /** * test - queue */
    public static final String TEST_QUEUE = "test-queue";

    /** * test - switch */
    public static final String TEST_EXCHANGE = "test-exchange";

    /** * Maximum number of retries after receiving message dead letter queue */
    public static final String DELAY_QUEUE = "delay-queue";
    /** * The switch that receives the message after the maximum number of retries */
    public static final String DELAY_EXCHANGE = "delay-exchange";
    /** * Routing key name */
    public static final String DELAY_ROUTING_KEY = "delay-routing-key";
Copy the code

2. Binding relationship

@Bean(name = "testExchange")
    public FanoutExchange testExchange(a) {
        logger.info("exchange : {}", RabbitMqKey.TEST_EXCHANGE);
        return new FanoutExchange(RabbitMqKey.TEST_EXCHANGE);
    }

    @Bean(name = "delayTestQueue")
    public Queue delayTestQueue(a) {
        logger.info("queue : {}", RabbitMqKey.TEST_QUEUE);
        // Queue persistence
        return new Queue(RabbitMqKey.TEST_QUEUE, true);
    }

    @Bean
    Binding delayTestBinding(@Qualifier("delayTestQueue") Queue delayTestQueue,
                          @Qualifier("testExchange") FanoutExchange testExchange) {
        return BindingBuilder.bind(delayTestQueue).to(testExchange);
    }

    @Bean(name = "delayQueue")
    public Queue delayQueue(a) {
        logger.info("queue : {}", RabbitMqKey.DELAY_QUEUE);
        // Queue persistence
        return new Queue(RabbitMqKey.DELAY_QUEUE, true);
    }

    @Bean(name = "delayExchange")
    public DirectExchange delayExchange(a) {
        logger.info("exchange : {}", RabbitMqKey.DELAY_EXCHANGE);
        return new DirectExchange(RabbitMqKey.DELAY_EXCHANGE);
    }

    @Bean
    Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue,
                          @Qualifier("delayExchange") DirectExchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(RabbitMqKey.DELAY_ROUTING_KEY);
    }
Copy the code

3. Modify the configuration file

Rabbitmq: host: localhost port: 5672 virtual-host: / username: admin Password: admin # Prefetch: 1 # Autoacknowledge auto manual Manual Default autoacknowledge -mode: Auto retry: enabled: true # Number of retry attempts: 5 Maximum interval max-interval: 20000 Initial-interval: 12s >20s 20s multiplier: 2 # Indicates whether to reject the message after the retry count exceeds the value specified above (false) default-requeue-Rejected: falseCopy the code

TestListener class adds methods to process queue information

@Component
public class TestListener {

    private static final Logger logger = LoggerFactory.getLogger(TestListener.class);

    /** * Receive message **@param message
     */
    @RabbitListener(queues = RabbitMqKey.TEST_QUEUE)
    public void process(Message message) throws UnsupportedEncodingException {
        String msg = new String(message.getBody());
        if (StringUtils.isBlank(msg)) {
            logger.warn("Received data is empty");
            return;
        }
        System.out.println(LocalDateTime.now() + ":Subscriber:" + new String(message.getBody(), "UTF-8"));
        // An exception occurs
        int a = 0;
        int b = 1 / a;
    }

    /** * Receive messages that exceed the maximum retry times **@param message
     */
    @RabbitListener(queues = RabbitMqKey.DELAY_QUEUE)
    public void process1(Message message){
        try {
            String msg = new String(message.getBody());
            if (StringUtils.isBlank(msg)) {
                logger.warn("Received data is empty");
                return;
            }
            System.out.println("Dead letter message received:" + msg);
        } catch (Exception e) {
            logger.warn("Processing received data, exception: {}", e.getMessage()); }}}Copy the code

5, Add Sender method to send message

/** * test **@param payload
     * @return* /
    public String testSendExchange(Object payload){
        return baseSend(RabbitMqKey.TEST_EXCHANGE, "", payload, null.null);
    }
Copy the code

6. ProducersController adds test interface

/**
     * 发送信息
     */
    @PostMapping("/send/test")
    public void sendTest(a){
        sender.testSendExchange("Test message retry mechanism");
    }
Copy the code

7. Most importantly, the RabbitAck class increases the number of retries over which messages are forwarded to the switch

@Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, RabbitMqKey.DELAY_EXCHANGE, RabbitMqKey.DELAY_ROUTING_KEY);
    }
Copy the code

If you do not manually configure MessageRecoverer, will use the default RejectAndDontRequeueRecoverer, simply abnormal will print out, the source code is as follows:

public class RejectAndDontRequeueRecoverer implements MessageRecoverer {

    protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class);

    @Override
    public void recover(Message message, Throwable cause) {
    	if (this.logger.isWarnEnabled()) {
            this.logger.warn("Retries exhausted for message " + message, cause);
    	}
    	throw new ListenerExecutionFailedException("Retry Policy Exhausted".newAmqpRejectAndDontRequeueException(cause), message); }}Copy the code

8. Test and view the results

(1) MessageRecoverer is configured:

(2) MessageRecoverer not configured:

If it is not necessary to ensure that the message is particularly stable, data can not be lost. Then completely can not be configured with automatic retry mechanism, after all, network fluctuation this situation is still very rare, business logic execution problems (such as null pointer, array out of bounds, etc.) you try 100 times is also reported abnormal, this can only pull a programmer out of heaven.

Repeat purchases

Steal a picture of Aobin (a very good author) here to explain

The user has ordered, the integral service consumption failed, and the application for retry, and the activity system, coupons and other services have been successful consumption, request retry is to send this message again, that has been successful consumption of the service should not be repeated consumption? This is not good, light is scolded by the boss, heavy is packed to go home, more heavy is killed by the boss on the spot, then how can we avoid the repeated consumption of the news?

The general approach to this problem is called interface idempotence

Idempotent (idempotence) is a mathematical and computer concept commonly found in abstract algebra. The characteristic of an idempotent operation in programming is that any number of executions have the same effect as a single execution. An idempotent function, or idempotent method, is a function that can be executed repeatedly with the same parameters and achieve the same results. These functions do not affect system state, nor do they have to worry about system changes caused by repeated execution. For example,"setThe True() "function is an idempotent function that yields the same result no matter how many times it is executed. More complex operational idempotent guarantees are implemented using unique transaction numbers (serial numbers).Copy the code

Popular meaning: When you receive MQ, check whether you have consumed this MQ. If yes, you will not consume it. If no, you will consume it. Consume if it does not exist (my current project is to receive MQ and verify whether the order number sent by MQ exists. If it does exist, print the abnormal log and consume if it does not exist)

Message loss

There are four ways to resolve message loss:

1. Message persistence (see the code above to see how queues and exchanges persist)

2.ACK confirmation mechanism (also described above)

3. Set the cluster mirroring mode

4. Message compensation mechanism

Click here for details

Specific solutions or go to a specific company, my company side, such as after deducting poundage, we are the first to note down the charges after receiving orders successful mq, then evening unified transfer, not real-time transfer mainly considering the amount of decimal point problem, because according to certain proportion among several account, a transfer is better than many times, So when timing task transfer amount to us according to the order table data check all orders on the same day, to see whether there is a successful order but mq information loss, if there is automatically collection poundage, of course, this is just our company’s processing method may be less mature, specific processing information loss problem still look at your company specific situation again!

Order consumption

Sequential consumption is the sequential consumption of the consumer, the RabbitMQ queue only holds the publisher’s sequential message, but whether the consumer is sequential is another matter

As shown above, the two results are not completely different

Solution:

Set prefetchCount=1, consume one message at a time, perform manual ack after processing, and then receive the next message. It’s just handled by a Consumer

If there are multiple consumers, manual ack will not work. Each Consumer will receive a message immediately (if the number of messages is greater than the number of consumers), and the remaining messages will be sent after ack. So there is still no guarantee of sequential consumption, as shown in the figure below

According to my idea, the queue is first in first out, add, modify, delete, A provider of push three messages into the queue, then B consumers received should be add, modify, delete, because it is manual confirmation mechanism, B after receiving the new execution success manual confirmation, will receive the changes, and then receives the delete messages, This is also to ensure the order (I don’t know if there is any problem with my theory, if you know any problem, please comment to tell me, I will change, I also baidu some, but basically the same idea).

There is an article here, about the order of consumption, there is a need to see!


This project was launched a few days ago, only requires a local rabbitMQ installation to run, and will continue to increase the use of springCloud components and some middleware!

If there is a need you can pay attention to my public number, will immediately update the Java related technical articles, public number there are some practical information, such as Java second kill system video tutorial, dark horse 2019 teaching materials (IDEA version), BAT interview summary (classification complete), MAC commonly used installation package (some are taobao to buy, Has PJ’s).

I will see you in the next article. Remember to give a thumbs-up if you haven’t already!