RabbitMQ is used in basic mode
RabbitMQ basic usage: message queuing, AMQP introduction, and RabbitMQ support 7 patterns and code implementation.
Expiration time
Expiration Time TTL (Time To Live) Indicates that the message can be consumed within the expected Time. The message is automatically deleted when the time is exceeded.
RabbitMQ can set TTL for messages and queues:
-
By setting the queue property, all messages in the queue have the same expiration time
-
Messages can be set individually, and the TTL for each message can be different
-
If both methods are used together, the message’s expiration time is whichever is smaller in between
Once the TTL of a Message exceeds the set value, it is called a Dead Message and sent to the dead-letter queue. Consumers cannot receive the Message again.
Queue TTL
See RabbitMQ integration with SpringBoot for consumer and producer engineering codes
Add a testTTl queue to consumer project RabbitConfig to testTTl functionality
@Bean
public Queue topicTTL() {
return QueueBuilder.durable("testTTL").ttl(6000).build();
}
@Bean
public Binding topicBindingTTL(Exchange topicExchange, Queue topicTTL) {
return BindingBuilder.bind(topicTTL).to(topicExchange).with("test.#").noargs();
}
Copy the code
The TTL method passes in the millisecond value, and the bottom line is to add the X-message-TTL attribute to the queue. If the TTL is not set, the message will not expire. If the TTL is set to 0, the message is immediately discarded unless it can be delivered directly to the consumer at this point.
Initiate a request:
http://127.0.0.1:8081/sendMessage?exchange=testTopic&routingKey=test.halo&message=HelloWorld! ×=100Copy the code
Wait 6 seconds, as shown below, the testTTL queue has cleared messages, but the testQueue remains.
Message TTL
Added test TTL function interface in ProducerController of producer engineering
@GetMapping("/sendMessageTTL") public String sendMessage(String exchange, String routingKey, String message, Integer times, Long ttl) { if (times == null) { times = 1; } if (ttl == null) { ttl = 10000L; } int realTimes = 0; MessageProperties properties = new MessageProperties(); properties.setExpiration(String.valueOf(ttl)); for (int i = 0; i < times; i++) { Message msg = new Message((message + i).getBytes(), properties); rabbitTemplate.convertAndSend(exchange, routingKey, msg); realTimes++; } return "realTimes + realTimes + realTimes" ; }Copy the code
Initiate a request:
http://127.0.0.1:8081/sendMessageTTL?exchange=testTopic&routingKey=test.halo&message=HelloWorld! ×=100&ttl=1000Copy the code
Because the TTL in the request is 1 second and the page refreshes every 5 seconds, total is always displayed as 0.
You can see that the testQueue and testTTL messages are missing. Note: When both queue and message TTL values are specified, the smaller of the two takes effect.
Dead-letter queue
DLX, which stands for dead-letter-Exchange, can be called a dead-letter Exchange or a dead-letter mailbox. When a message becomes a dead message in one queue, it can be re-sent to another switch, the DLX. The queue bound to the DLX is called a dead letter queue.
Messages become dead-letter for the following reasons:
- Message rejected
- Message expiration
- The queue length reaches the maximum. Procedure
The DLX is also a normal switch, no different from a normal switch. It can be specified on any queue, in effect setting the attributes of a queue. When there is a dead letter in the queue, RabbitMQ will automatically re-publish the message to the set DLX and route it to the dead letter queue.
To use a dead-letter queue, you only need to specify the switch by setting the queue parameter X-dead-letter-exchange when defining the queue.
Added to consumer RabbitConfig
@ Bean public Exchange dlxExchange () {/ / return dead-letter switches ExchangeBuilder. FanoutExchange (" testDLX "). The durable (true). The build (); Return queueBuilder.durable ("testDlx").build();} @bean public Queue dlxQueue() {return queueBuilder.durable ("testDlx").build(); } @Bean public Binding dlxBinding(Exchange dlxExchange, Queue dlxQueue) { return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("").noargs(); } @bean public Queue topicTtlDlx1() {// Expire Queue return QueueBuilder.durable("testTtlDlx1").ttl(6000).deadLetterExchange("testDLX").build(); } @bean public Queue topicTtlDlx2() {return QueueBuilder.durable("testTtlDlx2").maxLength(10).deadLetterExchange("testDLX").build(); } @Bean public Binding dlxBinding1(Exchange topicExchange, Queue topicTtlDlx1) { return BindingBuilder.bind(topicTtlDlx1).to(topicExchange).with("test.#").noargs(); } @Bean public Binding dlxBinding2(Exchange topicExchange, Queue topicTtlDlx2) { return BindingBuilder.bind(topicTtlDlx2).to(topicExchange).with("test.#").noargs(); }Copy the code
-
Create a Fanout testDLX switch as a dead-letter switch.
-
Create a testDlx queue and bind it to the testDlx switch.
-
Create an expiration queue, testTtlDlx1, with an expiration time of 6 seconds, bound to the testTopic switch of type TEST.#, dead-letter switch testDLX.
-
Create a message limit queue testTtlDlx2, limit 10 messages, bind to the testTopic switch, route to test.#, dead-letter switch to testDLX.
Initiate a request:
http://127.0.0.1:8081/sendMessage?exchange=testTopic&routingKey=test.halo&message=HelloWorld! ×=100Copy the code
As you can see, testTtlDlx2 can only hold 10 messages, and the remaining 90 messages are immediately forwarded to the testDlx dead-letter queue (testQueue is 0 because the consumer is enabled and only consumes this queue).
After some time, all the messages in testTtlDlx1 expire and are forwarded to the testDlx dead-letter queue. At this point, there are exactly 190 messages in the dead-letter queue (the testTTL queue is not configured with a dead-letter queue, so the messages will not be sent to the dead-letter queue after expiration).
Delays in the queue
The object stored in the delay queue is the corresponding delayed message. The so-called “delayed message” means that after the message is sent, consumers do not want to get the message immediately, but wait for a specific time before they can get the message for consumption.
Delayed queuing in RabbitMQ can be implemented with an expiration time + a dead letter queue.
For example, set expiration queue A with A TTL of 5 seconds and dead-letter queue B. So if you send A message to A, and no consumer consumes A’s message, 5 seconds later, the message is sent to B, and the consumer in the dead letter queue B can get the message. This enables a 5 second delay in consuming messages.
Application Scenarios:
-
Any task that needs to be executed after a specified time can be processed through the delay queue
-
Payment scenario: If the payment is not successful within 15 minutes after the user places the order, the order will be cancelled, and some business processes of the cancellation will be performed (such as updating the order status and adding back the inventory).
Message acknowledgement mechanism
There are two ways to ensure that messages are delivered: publish acknowledgements and transactions. Both cannot be used at the same time. When a channel is a transaction, the confirmation mode cannot be introduced. When channel is in confirm mode, transactions cannot be used.
Release confirmation
There are two methods: message sending success acknowledgement and message sending failure callback.
Added an interface to producer ProducerController
@GetMapping("/sendMessagePC") public String sendMessagePC(String exchange, String routingKey, String message, Integer times) throws InterruptedException, ExecutionException, TimeoutException { if (times == null) { times = 1; } / / message is sent successfully confirmed rabbitTemplate setConfirmCallback ((correlationData, b, s) - > {System. Out. Println (" correlationData: " + correlationData); System.out.println("ack: " + b); System.out.println("cause: " + s); }); / / message sending failed callback rabbitTemplate setReturnsCallback (returned - > {System. Out. Println (" returnedMessage: "+ returned); }); int realTimes = 0; for (int i = 0; i < times; i++) { CorrelationData correlationData = new CorrelationData("Message " + i); rabbitTemplate.convertAndSend(exchange, routingKey, message + i, correlationData); CorrelationData.Confirm confirm = correlationData.getFuture().get(10, TimeUnit.SECONDS); System.out.println("Confirm received, ack = " + confirm.isAck()); realTimes++; } return "realTimes + realTimes + realTimes" ; }Copy the code
Add the following configuration to the producer application.yml configuration file
Server: port: 8081 Spring: rabbitMQ: host: 192.168.2.100 port: 5672 username: dev Password: 123456 virtual-host: /dev # configure publisher-confirm-type () for forwarding failures. Message loss mandatory: trueCopy the code
Send a request:
http://127.0.0.1:8081/sendMessagePC?exchange=testTopic&routingKey=test.halo&message=HelloWorld! ×=1Copy the code
When the message is sent normally, the following logs are printed. You can find that everything is normal (the call is confirmed to be successful, and the callback is not called to be failed).
Confirm received, ack = true correlationData: CorrelationData [id=Message 0] ack: true cause: Null http://127.0.0.1:8081/sendMessagePC? exchange=testTopic1&routingKey=test.halo&message=HelloWorld! ×=1Copy the code
When sending a message to a non-existent switch, the log is as follows: failed to send the message and the reason is no exchange ‘testTopic1’ in vhost ‘/dev’.
Confirm received, ack = false correlationData: CorrelationData [id=Message 0] ack: false cause: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'testTopic1' in vhost '/dev', class-id=60, Method - id = 40) http://127.0.0.1:8081/sendMessagePC? exchange=testTopic&routingKey=test2.halo&message=HelloWorld! ×=1Copy the code
When a message is sent to a route that does not exist, the log is as follows: sent successfully (call confirmation of success, and call failure callback)
Confirm received, ack = true returnedMessage: ReturnedMessage [message=(Body:'HelloWorld! 0' MessageProperties [headers={spring_returned_message_correlation=Message 0}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=testTopic, routingKey=test2.halo] correlationData: CorrelationData [id=Message 0] ack: true cause: nullCopy the code
Conclusion:
- The success confirmation callback is called regardless of whether the message was sent successfully;
- Success is defined as whether a message is correctly sent to RabbitMQ. If no queue route matches the message and the message is lost, this is also called a success.
- The failure callback is invoked only if the sent message is discarded because there is no corresponding queue to receive it.
Transaction support
Scenario: After a service process fails (the transaction is rolled back), the message is not sent. RabbitMQ uses the caller’s external transaction, which is usually preferred because it is non-invasive (low coupling).
Add an interface to the producer ProducerController
@GetMapping("/sendMessageTransacted") @Transactional public String sendMessageTransacted(String exchange, String routingKey, String message) { rabbitTemplate.setChannelTransacted(true); . / / routing key and queue name rabbitTemplate convertAndSend (exchange, routingKey, message + "-- 01"); System.out.println(1/0); rabbitTemplate.convertAndSend(exchange, routingKey, message + "--02"); return "success"; }Copy the code
The producer application.yml configuration file is unconfigured with the following confirmation
Server: port: 8081 Spring: rabbitMQ: host: 192.168.2.100 port: 5672 username: dev Password: 123456 virtual-host: # publisher-confirm-type: correlated # publisher-returns-returns: true # template: # Mandatory: trueCopy the code
Inject the RabbitMQ transaction manager
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
Copy the code
Initiate a request:
http://127.0.0.1:8081/sendMessageTransacted?exchange=testTopic&routingKey=test.halo&message=HelloWorld!
Copy the code
You can see that none of the messages were sent (you can also comment out line 1/0 to see if two messages were sent; Or annotate the @Transactional annotation line to see if a message is sent).
Message tracking
Message center message tracing is implemented using Trace, which is used by RabbitMQ to record every message sent, for easy debugging and troubleshooting by RabbitMQ developers. Visual interfaces can be provided in the form of plug-ins. After Trace is enabled, system Exchange: amq.rabbitmq. Trace will be automatically created. Each queue will be bound to this Exchange, and all messages sent to the queue will be recorded in Trace logs.
Message tracking enable and view commands
Enable the plug-in before using it.
Related commands:
-
Rabbitmq-plugins list: View the list of plug-ins
-
Rabbitmq-plugins enable rabbitmq_tracing: enable the trace plug-in
-
Rabbitmqctl trace_on: Enable trace
-
Rabbitmqctl trace_on -p /dev: Enable the trace function (/dev is the vhost that requires the log trace).
-
Rabbitmqctl trace_off: Disables trace
-
Rabbitmq-plugins disable Rabbitmq_tracing: Disable the trace plug-in
-
Rabbitmqctl set_user_tags dev administrator: Set dev to administrator (only administrator can view logs)
Once the log tracking plug-in is enabled and trace_on is turned on, a Topic switch is added: amq.rabbitmq.trace.
Create a new Trace as shown in the figure above.
Initiate a request:
http://127.0.0.1:8081/sendMessage?exchange=testTopic&routingKey=test.halo&message=HelloWorld! ×=1Copy the code
Click dev-tracing. Log to pop up the page, and each record is captured after formatting. The content is as follows:
-
Production log
{“channel”: 1, “connection”: “192.168.2.1:10738 -> 192.168.2.100:5672”, “exchange”: “testTopic”, “node”: “rabbit@docker100”, “payload”: “SGVsbG9Xb3JsZCEw”, “properties”: { “content_encoding”: “UTF-8”, “content_type”: “text/plain”, “delivery_mode”: 2, “headers”: {
}, "priority": 0 Copy the code
}, “queue”: “none”, “routed_queues”: [ “testQueue”, “testTTL”, “testTtlDlx1”, “testTtlDlx2” ], “routing_keys”: [ “test.halo” ], “timestamp”: “2020-12-31 6:23:49:122”, “type”: “published”, “user”: “dev”, “vhost”: “/dev” }
-
Consumer log
{“channel”: 1, “connection”: “192.168.2.1:10728 -> 192.168.2.100:5672”, “exchange”: “testTopic”, “node”: “rabbit@docker100”, “payload”: “SGVsbG9Xb3JsZCEw”, “properties”: { “content_encoding”: “UTF-8”, “content_type”: “text/plain”, “delivery_mode”: 2, “headers”: {
}, "priority": 0 Copy the code
}, “queue”: “testQueue”, “routed_queues”: “none”, “routing_keys”: [ “test.halo” ], “timestamp”: “2020-12-31 6:23:49:123”, “type”: “received”, “user”: “dev”, “vhost”: “/dev” }
Payload is the base64-encrypted message content
Messages are stacked
When the rate of production exceeds the rate of consumption, messages pile up.
Effects of accumulation:
- New messages could not be queued
- Old messages cannot be discarded
- Message wait consumption is too long, which is beyond the business tolerance
Stacking generation scenario:
- There was a sudden deluge of announcements from producers
- The consumer is dead
- Poor consumer performance
- Consumer failure
Stacking solutions:
-
Message queues have the function of peak clipping. As long as they do not exceed the service tolerance, there is no major problem. You only need to check whether the producers are normal. If you do, follow these steps
-
Deploying multiple consumers
-
Identify consumer performance bottlenecks; Consumer multithreading, configure concurrency and Prefetch
-
Identify the cause of consumption failure
Message loss
Message lost at producer
The producer sent successfully, but MQ did not receive the message, and the message was lost during transmission from the producer to MQ, usually due to network instability.
Solution: Use the RabbitMQ publishing confirmation mechanism. When a message is successfully received by MQ, an acknowledgement message is sent to the producer indicating that the message has been received successfully. RabbitMQ sends confirmation messages in three modes: common confirmation, batch confirmation, and asynchronous listening confirmation. Spring uses only asynchronous listening confirmation mode with RabbitMQ, but can block synchronization.
CorrelationData.Confirm confirm = correlationData.getFuture().get(10, TimeUnit.SECONDS);
Copy the code
GetFuture () will block the thread, get(10, timeUnit.seconds) will return within 10 SECONDS, otherwise an exception will be thrown.
Messages are lost at RabbitMQ
Messages are successfully sent to MQ, but are lost in MQ before being consumed, as can be the case when the MQ server is down or restarted.
Solution: Persist switches, queues, and messages to ensure that the corresponding switches, queues, and messages can be recovered from disk when the MQ server restarts. Spring integration enables switch, queue, and message persistence by default, so you can ensure that messages are not lost to RabbitMQ without changing any Settings.
Message lost at consumer
Message consumer spending, if set to auto reply MQ message after receiving the message will be automatically reply MQ servers, MQ can delete this message, if the message has been removed at MQ but consumer business processing abnormal or consumer service outage, so will lead to the news did not handle successfully resulting in lost this news.
Solution: set to manual reply MQ server, when consumer is abnormal or service outage, MQ server does not delete the message, but will send messages to bind the queue of consumers, if the queue only bind to a customer, then the message will be kept in the MQ server, until the news person can normal consumption.
Ordered consumption message
Multiple consumers consume a queue
When RabbitMQ is in Work Queue mode, there is only one Queue but there are multiple consumers competing with each other and MQ messages will be out of order.
Solution: Using topic mode, bind multiple queues, and the producer calculates a hash value based on a field, mod the number of queues, and then sends the message to the corresponding queue. A certain type of message can be placed in the same queue and consumed by the same consumer.
Consumer adoption of multithreading
When RabbitMQ uses a simple queue mode, messages can also be out of order if consumers use multithreading to speed up the processing of messages.
Solution: The consumer retrieves the message, calculates a hash value based on a field, modulates it to the number of memory queues, and then sends the message to the corresponding memory queue for the same thread to process. Order can be guaranteed.
Repeat purchases
To avoid the loss of news on the consumer side, will adopt the way of manual reply MQ to solve, also raises a question at the same time, the consumer process the message success, manual due to network instability when reply MQ, disconnects, lead to MQ did not get consumers to reply message, so this news will save in MQ message queue, Because of MQ’s message retransmission mechanism, the message is resent to the messager bound to the queue for processing, resulting in repeated message consumption. And some operations are not allowed to repeat consumption, such as ordering, inventory reduction, deduction and other operations.
Solution: If the business of consuming messages is idempotent, repeated consumption is fine and can be left alone. If idempotent operations are not supported, such as placing an order, reducing inventory, deducting money, etc., then the message ID can be saved to the database after each successful consumption at the consumer end, and the message ID can be queried before each consumption. If the message ID already exists, it means that it has been consumed and will not be consumed; otherwise, it will be consumed. You can use Redis to store message ids and setnx to store message ids.
Setnx (key,value) : If the key does not exist, the insert succeeds and returns 1. If the key exists, nothing is done and 0 is returnedCopy the code