RabbitMQ is an open source implementation of AMQP developed in the Erlang language.
AMQP: Advanced Message Queue. It is an open standard of application layer protocol, designed for message-oriented middleware. The client and message-oriented middleware based on this protocol can transfer messages, regardless of product and development language conditions.
1.RabbitMQ originated in the financial system and is used to store and forward messages in distributed systems. It is easy to use, scalable, and highly available. Specific features include:
RabbitMQ uses several mechanisms to ensure Reliability, such as persistence, transmission confirmation, and release confirmation.
Flexible Routing is used to route messages through Exchange before they are queued. For typical routing, RabbitMQ already provides some built-in Exchange. For more complex routing capabilities, you can bind multiple Exchanges together or implement your own Exchange through a plug-in mechanism.
More than one RabbitMQ server can form a cluster to form a logical Broker.
Queues can be mirrored on machines in a cluster, making them usable even if some nodes fail.
RabbitMQ supports multiple message queuing protocols, such as STOMP and MQTT.
RabbitMQ supports almost all common languages, such as Java,.NET, Ruby, and so on.
RabbitMQ provides an easy-to-use user interface that enables users to monitor and manage many aspects of a message Broker.
If a message is abnormal, RabbitMQ provides a message Tracing mechanism so that the user can find out what happened.
RabbitMQ provides many plug-ins to extend RabbitMQ in many ways, or you can write your own.
Several modes of RabbitMQ
Basic message model
In the model above, there are the following concepts:
P: the producer, that is, the program to send the message
C: Consumer: The recipient of the message, waiting for the message to arrive.
Queue: Message queue, shown in red. Messages can be cached; Producers post messages to them, and consumers retrieve messages from them.
Working message model
Compared with entry-level programs, the two consuming queues together consume messages in the same queue, but a message can only be retrieved by one consumer.
This message model is particularly useful in Web applications where complex tasks cannot be handled in a short HTTP request window.
Let’s simulate the process:
P: producer: publisher of a task
C1: Consumer 1: Pick up the task and complete it, assuming slow completion (simulation time)
C2: Consumer 2: Pick up the task and complete it, assuming that the completion speed is fast
C1 and C2 consume messages on average
Publish/subscribe (switch type: Fanout, also called broadcast)
Different from the previous two modes:
1) Declare Exchange instead of Queue
2) Send messages to Exchange instead of Queue
What is the difference between publish/subscribe and work queues?
The difference between:
Queues need not be defined while publish/subscribe needs to be defined.
2) Publish /subscribe producers send messages to switches, work Queues producers send messages to queues (the default switch is used at the bottom).
3) Publish /subscribe requires queues bound to switches, while work Queues do not need to be bound to default queues.
Similarities:
So both implement the same publish/subscribe effect, multiple consumers listen to the same queue and do not consume messages repeatedly.
Publish /subscribe is more powerful than work queue (it can also compete with queues), and it can specify its own dedicated switch.
Routing Routing model (switch type: Direct)
P: the producer sends a message to the Exchange. When sending a message, a routing key is specified.
X: Exchange, which receives the message from the producer and sends it to a queue that matches the routing key exactly
C1: consumer, whose queue specifies the message whose routing key is error
C2: consumer, whose queue specifies the message whose routing key is info, Error, and Warning
Topics Wildcard pattern (switch type: Topics)
Each consumer listens to its own queue and sets up a routingkey with a card. The producer sends the message to the broker, and the switch forwards the message to the specified queue based on the Routingkey.
A Routingkey typically consists of one or more words, separated by “. Split, for example, inform. SMS
Wildcard rules:
# : Matches one or more words
* : matches exactly 1 word
For example:
Audit.# : Can match audit.irs.corporate or audit.irs
Audit.* : Matches only audit.irs
The RabbitMQ management
Start the
#1. Service startup related
# start
systemctl start rabbitmq-server
# to restart
systemctl restart rabbitmq-server
# stop
systemctl stop rabbitmq-server
# check status
systemctl status rabbitmq-server
The management command line is used to operate RabbitMQ without using commands from the Web management interfaceRabbitmqctl help to view more commands#3. Plug-in management command line
rabbitmqplugins enable|list|disable
Copy the code
RabbitMQ passes objects
RabbitMQ is a message queue that sends and receives string/byte array messages
Message queues can send strings, byte arrays, serialized objects
Passing an object is just a matter of serializing the object or converting the JSON string
Boot Switch and queue creation
Create a Configuration class annotated with @Configuration
Declare a queue
/** * dureable: specifies whether to persist. The default value is false. The persistent queue is stored on disk and will remain exclusive when the message broker restarts. The default is false and can only be used by currently created connections, and the queue will be closed when the connection is closed. * autoDelete: Indicates whether the queue is automatically deleted when no producer or consumer is using it
public Queue newQueue(a){
return new Queue("Queue name", dureable, exclusive, autoDelete)}Copy the code
Declare a subscription switch
@Bean
public FanouExchange newFanouExchange(a){
return new FanouExchange("Switch Name")}Copy the code
Declare a routing mode switch
@Bean
public DirectExchange newDirectExchange(a){
return new DirectExchange("Switch Name")}Copy the code
Binding queues requires both queues and switches (if you need to bind multiple queues, just repeat the operation)
@Bean
public Binding bindingDirectExchange(Queue method name defined above Queue, switch method name defined above DirectExchange){
return BindingBuilder.bind(Queue).to(DirectExchange).with("Routing mode key");
}
Copy the code
Reliable delivery of messages
RabbitMQ transactions (not used)
When transactions are added to the message delivery process, processing efficiency is reduced by tens or even hundreds of times
Channel.txselect () starts the transaction
Channel.txcommit () commits the transaction
Channel.txrollback () transaction rollback
Message acknowledgement and return mechanisms
Message acknowledgement mechanism: Verifies that the message provider successfully sends a message to the switch
Return mechanism: verifies that a message was successfully sent from the switch to the queue
In Spring Boot
1. Yml adds an enabling mechanism
publisher-confirm-type: simple ## Enable message confirmation mechanism
publisher-returns: true # use return listener
Copy the code
2. Create a listener class
@Component
public class MsgConfirmAndReturn implements RabbitTemplate.ConfirmCallback.RabbitTemplate.ReturnsCallback {
Logger logger = LoggerFactory.getLogger(MsgConfirmAndReturn.class);
@Resource
public RabbitTemplate rabbitTemplate;
// Set the current class to rabbitTemplate
@PostConstruct
public void init(a) {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
// Message confirmation
if (b) {
logger.info("----- message successfully sent to switch -----");
} else {
logger.warn("----- Message failed to be sent to switch -----"); }}@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
//return listener (performed when the switch dispatches a message to a queue)
logger.warn("----- Switch failed to distribute message -----"); }}Copy the code
Message consumption is confirmed manually
1. Yml enables the manual confirmation mechanism
rabbitmq:
listener:
Enable manual confirmation mechanism
acknowledge-mode: manual
Copy the code
After this function is enabled, if a message is manually acknowledged but not acknowledged, the message stays in the queue
// The message is manually acknowledged
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
Copy the code
How to ensure message reliability
1. Message loss The message is sent but does not reach the server due to a network problem
-
Make a good try-catch method. The network may fail to send messages. After the failure, a retry mechanism should be established, which can be recorded in the database and periodically scanned and resend messages
-
Log whether or not each message status was received by the server
-
Periodically resend the message. If the message fails to be sent, periodically scan the database for the failed message to resend the message
The message arrives at the Broker, which writes the message to disk (persistence) for success. The Broker has not been persisted and is down.
- Publisher must also incorporate an acknowledgement callback mechanism to acknowledge success messages and modify the database message state.
Automatic ACK. Consumers receive messages, but fail to receive them and then go down
- Manual ACK must be enabled, the consumption is successful before removal, failure or no time to process noAck and re-join the queue
2. Repeated message consumption succeeds, the transaction has been committed, and the machine breaks down when ack occurs. As a result, the message from the Broker changes from unACK to ready and is sent to other consumers. The message consumption fails and the message is automatically sent again due to the retry mechanism
The ack fails, the message changes from unack to Ready, and the Broker resends
-
The consumer’s business consumption interface should be designed to be idempotent. For example, there is a status sign of the work order
-
Using anti-duplicate table (Redis /mysql), each message has a unique identification of the service. Once processed, no processing is required
-
RabbitMQ has a reDELIVERED field for every message that was redelivered, not the first
3. Backlog of messages
Customer outages backlog
Consumers are running short of spending power
The sender sends too much traffic. Procedure
- Online more consumers, for normal consumption
- On-line special queue consumption service, take out the messages in batches first, record the database, and slowly process them offline
Rabbitmq basicReject/basicNack/basicRecover Difference
channel.basicReject(deliveryTag, true);
The basicReject method rejects the message corresponding to deliveryTag. The second parameter is requeue, true re-enqueues, otherwise it discards or enters the dead-letter queue. After the reject method is applied, the consumer still consumes the reject message.
channel.basicNack(deliveryTag, false, true);
Basic. nack method does not confirm the message corresponding to deliveryTag, whether the second parameter is applied to multiple messages, and whether the third parameter is requeue. Different from Basic. reject, it supports multiple messages at the same time and can nack the consumer for receiving all unack messages previously. Nack messages are also consumed by themselves.
channel.basicRecover(true);
Basic. recover Whether to restore the message to the queue. The parameter is requeue or not. False: The message will be redelivered to itself.
Message consumption idempotent problem
Automatic message retry of RabbitMQ 1. Mq_ will automatically trigger retries if an exception is thrown while we are processing our business code, and by default RabbitMQ will retry an unlimited number of times. The retry limit problem needs to be manually specified
Configure the retry mechanism
rabbitmq:
listener:
simple:
retry:
## Start consumer (program problem) retry
enabled: true
## Number of retries (the unconsumed message will be discarded after retry)
max-attempts: 5
Retry interval
initial-interval: 3000
Copy the code
2. Under what circumstances does a consumer need to implement a retry policy?
A. After the consumer gets the message, it invokes the third-party interface, but fails to invoke the third-party interface? Do I need to retry? In this case, a retry policy needs to be implemented. The network delay is only temporary, and the call may be successful after repeated retries.
B. After the consumer gets the message, a data exception is thrown due to a code problem. Do I need to retry?
In this case, there is no need to implement a retry policy, even if the retry is repeated, it will fail. Logs can be stored for later use in the form of scheduled tasks or manual compensation. If the message is retried multiple times or fails, the consumer version needs to be republished to achieve consumption using dead letter queues
Mq can cause problems with repeated consumption by consumers during the retry process.
Mq consumers need to solve the problem of idempotency to ensure that data is unique
Dead-letter queue
A message becomes a dead-letter when one of the following three conditions occurs in the queue.
- The message was rejected (basic.reject/basic.nack) with Requeue = false
- Message TTL expired
- The queue length reaches the maximum. Procedure
After a message becomes a dead letter in a queue, if a dead letter queue is configured, it is republished to a dead letter switch, which posts the dead letter to a queue called a dead letter queue.
Create some properties of the queue
/** * Create dead letter queue DLX *@return* /
@Bean
public Queue orderDelayQueue(a){
HashMap<String, Object> map = new HashMap<String, Object>();
map.put("x-message-ttl".10000);
map.put("x-dead-letter-exchange",ORDER_DELAY_EXCHANGE);
map.put("x-dead-letter-routing-key"."k2");
return new Queue(ORDER_DELAY_QUEUE,true.false.false,map); } Some of the attributes of the queue (1X-message-ttl: indicates the expiration time of the message, in milliseconds. (2X-expires: specifies how long a queue will be deleted after it has not been accessed, in milliseconds. (3) x-max-length: indicates the maximum length of the queue. If the maximum length is exceeded, the message will be deleted from the queue head. (4) x-max-length-bytes: indicates the maximum space occupied by the message content in the queue. If the memory size exceeds this threshold, the message will be deleted from the header of the queue. (5X-overflow: Sets queue overflow behavior. This determines what happens to the message when the maximum length of the queue is reached. Valid values are drop-head, reject-publish, or reject-publish- DLX. The quorum queue type supports only drop-head. (6X-dead-letter-exchange: the name of the dead-letter exchange to which messages that have expired or been deleted (because the queue length is too long or the space has exceeded a threshold) can be specified to be sent; (7X-dead-letter-routing-key: dead-letter routing key used when the message is sent to the dead-letter exchange. If not set, the original routing key of the message is used.8X-single-active-consumer: indicates whether the queue is a single active consumer,true, there is only one consumer consumption message in the registered consumer group, and the others are ignored.falseThe message loop is distributed to all consumers (defaultfalse) (9X-max-priority: maximum priority to be supported by the queue; If not set, the queue does not support message priority. (10) X-queue-mode (Lazy mode) : Sets the queue to Lazy mode to keep as many messages on disk as possible to reduce RAM usage; If not, the queue will retain the memory cache to deliver messages as quickly as possible; (11X-queue-master-locator: Sets the master node information of the mirror queue in cluster mode.Copy the code
Delays in the queue
The AMQP protocol and RabbitMQ queues themselves do not support delay queuing, but they can be simulated using the Time To Live (TTL) feature
TTL is the lifetime of the message. RabbitMQ can set lifetime for queues and messages, respectively
When creating a queue, you can set the lifetime of the queue. When a message enters the queue and no consumer consumes it during the lifetime, the message is removed from the current queue.
A message queue is created without SETTING TTL, but a message is set to TTL. If the TTL is set, the message will be removed when its lifetime ends.
Implementing delay queuing
1. Create a routing switch
2, create two queues, one normal queue set to k1, one dead letter queue set to k2
-
X-dead-letter-exchange: indicates the switch to be forwarded after timeoutCopy the code
-
X-dead-letter-routing-key: indicates the key of the switch to be forwarded after timeoutCopy the code
-
X-message-ttl: indicates the timeout timeCopy the code
3. Bind the vm
4, send message to dead letter queue K2, and receive message K1
The rabbitmq springboot integration
Start the rabbitmq# start
systemctl start rabbitmq-server
# to restart
systemctl restart rabbitmq-server
# stop
systemctl stop rabbitmq-server
# check status
systemctl status rabbitmq-server
Copy the code
IP address +15672 access after successful startup
To create a queue, there must be a consumer. If there is no consumer, the queue will not send a message
Import dependence
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code
yml
spring:
rabbitmq:
host: 192.168141.124. # host IP
port: 5672 # port
username: root # username
password: root # your password
Copy the code
Message is sent
## Inject templates
@Autowired
private RabbitMQTemplate RabbitMQTemplate;
Send messages directly to the queue
AmqpTemplate. ConvertAndSend (" queue name ", message)
## Send messages to switches (subscribe switches)
AmqpTemplate. ConvertAndSend (" switch" , "", message)
## Send messages to switches (routing switches)
AmqpTemplate. ConvertAndSend (" switch" , routing key, message)
// Receive the message
@Component
public class HelloCustomer {
@RabbitListener(queuesToDeclare = @queue (value = "Queue name ")
public void a(String msg){
System.out.println("msg = " + msg);
}
}
Copy the code