01 preface
Today RabbitMQ and understand its underlying principles.
02 introduction
RabbitMQ is a Message Queue developed by Erlang based on the Advanced Message Queue (AMQP) protocol.
Why RabbitMQ?
-
Makes it simple and powerful.
-
Based on AMQP protocol.
-
Active community and well-documented.
-
High concurrency is good, thanks to the Erlang language.
-
Spring Boot is integrated with RabbitMQ by default
03 closer agreement
3.1 AMQP basic introduction
AMQP (Advanced Message Queuing Protocol) is an application-layer standard that provides unified messaging services. It is an open application-layer Protocol designed for message-oriented middleware. The client and messaging middleware based on this protocol can pass messages, regardless of the client/middleware with the same product, different development languages and other conditions.
The implementation of AMQP includes RabbitMQ, OpenAMQ, Apache Qpid, Redhat Enterprise MRG, AMQP Infrastructure, ø MQ, Zyre, etc.
The latest versions of Rabbitmq support AMQP 0-9-1 by default, which consists of three parts:
Module Layer: At the highest level of the protocol, it defines commands that can be invoked by clients to implement custom business logic. For example, a client can either Declare a Queue using the queue. Declare command or Consume messages in a Queue using the basic. Consume subscription.
Session Layer: located in the middle Layer, it sends commands from the client to the server and replies from the server to the client. It provides a reliable synchronization mechanism and error handling for communication between the client and the server.
Transport Layer: At the bottom Layer, it mainly transmits binary data streams and provides frame processing, channel multiplexing, error checking and data tabulation.
3.2 AMQP producer transfer process
When a client establishes a connection with the Broker, it sends a Protocol Header 0-9-1 to inform the Broker that the AMQP 0-9-1 Protocol is used in this interaction.
The Broker then returns connection.start to establish the Connection. Start/. Start-ok, Connection.tune /. Tune-ok, and Connection.open /. Open-ok are involved in the Connection process.
Open and channel. open-ok commands are used to create channels after the connection is established. Exchange.Declare and exchange. Declare -ok commands are used to Declare switches. Similarly, the specified commands are used to declare queues and bind queues to switches.
The basic. Publish command is used to send messages, which also contains conetent-header and Content-body. The Content Header contains the properties of the message Body, and the Content-Body contains the message Body itself.
3.3 AMQP consumer transfer process
When a consumer consumes a message, the commands and generators involved are mostly the same. Qos/.Qos -ok as well as basic. Consume and basic. Consume -ok.
The basic. Qos/. Qos-ok commands are used to confirm the maximum number of unconfirmed messages that a consumer can keep. The commands basic. Consume and basic. Consume -ok are mainly used for message consumption confirmation.
04 RabbitMQ features
RabbitMQ is written in the Erlang language and uses the Mnesia database to store messages. RabbitMQ uses mechanisms to ensure Reliability, such as persistence, transport confirmation, and release confirmation. Flexible Routing Is used to route messages through Exchange before they are queued. For typical routing, RabbitMQ already provides some built-in Exchange. For more complex routing capabilities, you can bind multiple Exchanges together or implement your own Exchange through a plug-in mechanism. More than one RabbitMQ server can form a cluster to form a logical Broker. Highly Available Queues can be mirrored on machines in a cluster, making Queues usable even if some nodes fail. RabbitMQ supports multiple message queuing protocols, such as AMQP, STOMP, MQTT, and so on. RabbitMQ supports almost all common languages, such as Java,.net, Ruby, PHP, C#, JavaScript, and so on. RabbitMQ provides an easy-to-use user interface that allows users to monitor and manage messages and nodes in a cluster. RabbitMQ provides many plug-ins to extend RabbitMQ in many ways, but you can also write your own.
05 Working Model
Broker: a physical server for RabbitMQ A transport service that maintains a transmission line from producer to consumer and ensures that message data can be transmitted in a specified manner. Exchange: message switch. Specifies the rules by which messages are routed to which Queue. Queue: message Queue. A carrier of messages, each of which is delivered to one or more queues. Binding: Binding. The Exchange and Queue are bound according to some routing rule. Routing Key: indicates the Key of the route. Exchange sends messages based on the Routing Key. The Key specified when defining the Binding is called a Binding Key. Vhost: indicates a virtual host. A Broker can have multiple virtual hosts that are used to separate permissions for different users. A virtual host holds a set of Exchanges, queues, and Bindings. Producer: indicates message producers. The main message is delivered to the corresponding Exchange. It’s usually a separate program. -Penny: Consumer. The receiver of a message, usually an independent program. Connection: LONG TCP connections between producers and consumers and brokers. Channel: Message Channel. Multiple channels can be established for each connection on the client side, and each Channel represents a session task. In the RabbitMQ Java Client API, a number of programming interfaces are defined on channels.
5.1 Switch Types
Direct Exchange Directly connects to a switch
Definition: When a directly connected switch is bound to a queue, an explicit binding key needs to be specified. Routing rule: When sending a message to a directly connected switch, the bound queue can receive the message only when the routing key matches the binding key.
Topic Exchange Topic switch definition: When a Topic type switch is bound to a queue, you can specify a routing key that matches the pattern. There are two wildcards, * for matching a word. # matches zero or more words. Words are separated by a.. Routing rule: When sending a message to a topic switch, the bound queue can receive the message only when the routing key matches the binding key mode.
Channel.basicpublish ("MY_TOPIC_EXCHANGE", "sh.abc", null, msg.getBytes()); Channel. basicPublish("MY_TOPIC_EXCHANGE", "bj.book", null, msg.getBytes()); Channel.basicpublish ("MY_TOPIC_EXCHANGE", "abc.def.food", null, msg.getbytes ());Copy the code
Fanout Exchange Broadcast switch
Definition: When a broadcast switch is bound to a queue, a binding key is not required. Routing rule: When a message is sent to a broadcast switch, you do not need to specify a routing key. All queues bound to the message can receive the message.
6 the RabbitMq installation
Download mirror
docker pull rabbitmq
Copy the code
Create and start the container
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq
Copy the code
-
-d Background running container;
-
–name Specifies the container name.
-
-p Specifies the port on which the service runs (5672: application access port; 15672: Console Web port number);
-
-v Mapping directory or file.
-
(one important thing about RabbitMQ is that it stores data according to what is called a “node name”, which is the hostname by default);
-
-e Specifies the environment variable. RABBITMQ_DEFAULT_VHOST: specifies the default VM name. RABBITMQ_DEFAULT_USER: default user name. RABBITMQ_DEFAULT_PASS: password for default user name)
Start the RabbitMQ background management service
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management
Copy the code
Visit background page:
http://127.0.0.1:15672 Default password: admin AdminCopy the code
07 RabbitMQ Quick Start
Maven rely on
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> < version > 2.3.0. RELEASE < / version > < / dependency > < the dependency > < groupId > org. Springframework. Boot < / groupId > < artifactId > spring - the boot - starter - web < / artifactId > < version > 2.3.0. RELEASE < / version > < / dependency > < the dependency > <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> The < version > 2.3.0. RELEASE < / version > < / dependency > < / dependencies >Copy the code
The rabbitmq configuration class
/** * @author Original * @date 2020/12/22 * @since 1.0 **/ @configuration public class RabbitConfig {public static final RabbitConfig String EXCHANGE_TOPICS_INFORM = "exchange_topic_inform"; public static final String QUEUE_SMS = "queue_sms"; public static final String QUEUE_EMAIL = "queue_email"; @bean Public Exchange getExchange() {//durable(true) Durable, Reboot message queue switch is still return ExchangeBuilder. TopicExchange (EXCHANGE_TOPICS_INFORM). Durable (true). The build (); } @Bean("queue_sms") public Queue getQueueSms(){ return new Queue(QUEUE_SMS); } @Bean("queue_email") public Queue getQueueEmail(){ return new Queue(QUEUE_EMAIL); } @Bean public Binding bindingSms(@Qualifier("queue_sms") Queue queue, Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("demo.#.sms").noargs(); } @Bean public Binding bindingEmail(@Qualifier("queue_email") Queue queue, Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("demo.#.email").noargs(); }}Copy the code
producers
@Service public class RabbitmqProviderService { @Autowired RabbitTemplate rabbitTemplate; public void sendMessageSms(String message) { rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPICS_INFORM,"demo.one.sms",message); } public void sendMessageEmail(String message) { rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPICS_INFORM,"demo.one.email",message); }}Copy the code
consumers
@Component public class RabbitMqConsumer { @RabbitListener(queues = {RabbitConfig.QUEUE_EMAIL}) public void listenerEmail(String message, Message msg , Channel channel) { System.out.println("EMAIL:"+message); System.out.println(msg); System.out.println(channel); } @RabbitListener(queues = {RabbitConfig.QUEUE_SMS}) public void listenerSms(String message) { System.out.println("SMS:"+message); }}Copy the code
Start the class
/** * @author original * @date 2020/12/22 * @since 1.0 **/ @springBootApplication @enablerAbbit public class RabbitMqApplicaiton { public static void main(String[] args) { ResourceLoader resourceLoader = new DefaultResourceLoader(RabbitMqApplicaiton.class.getClassLoader()); try { String path = resourceLoader.getResource("classpath:").getURL().getPath(); System.out.println(path); } catch (IOException e) { e.printStackTrace(); } SpringApplication.run(RabbitMqApplicaiton.class, args); }}Copy the code
web
@RestController public class DemoController { @Autowired RabbitmqProviderService rabbitmqProviderService; @RequestMapping("/sms") public void sendMsgSms(String msg) { rabbitmqProviderService.sendMessageSms(msg); } @RequestMapping("/eamil") public void sendMsgEmail(String msg) { rabbitmqProviderService.sendMessageEmail(msg); }}Copy the code
Sending a message via a page:
http://localhost:44000/sms?msg=1111
http://localhost:44000/email?msg=1111
08 Advanced usage of RabbitMQ
8.1 TTL
Message set expiration time
MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("30000"); Message MSG = new Message(" Message content ".getBytes(),messageProperties); // If the message is not consumed in time, after 30 seconds it will become dead letter and Rabbitmq will discard it. rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPICS_INFORM,"demo.one.sms",msg);Copy the code
The queue expiration time is set
Queue queue = QueueBuilder.durable(QUEUE_SMS).ttl(30000).build();
Copy the code
Dead-letter queue
When a message becomes dead letter, it is deleted by MQ by default. If we specify a “dead-letter Exchange” (DLX: dead-letter Exchange) for the queue, the message will then be forwarded to the dead-letter switch and consumed by the queue (dead-letter queue) bound to the dead-letter switch. Thus the effect of delayed message sending is realized. In three cases, messages are sent to a Dead Letter Exchange (DLX) switch. 1, (NACK | | Reject) && requeue = = false 2, 3, reached the maximum queue length message expiration, can through the x – Max – length parameter to specify the length of the queue, if not specified, can be thought of as an infinite long (team first message will be sent to the DLX)
1. Declare the binding of dead letter switches, dead letter queues, dead letter switches and dead letter queues
@bean (name = "dlx.exchange") public exchange dlxExchange() {return ExchangeBuilder.directExchange("dlx.exchange").durable(true).build() ; @bean (name = "dlx.queue") public queue dlxQueue() {return queueBuilder.durable ("dlx.queue").build(); } / / finish dead-letter queue and dead-letter switches Binding @ Bean public Binding dlxQueueBindToDlxExchange (@ the Qualifier (value = "DLX. Exchange") exchange exchange, @Qualifier(value = "dlx.queue") Queue queue) { return BindingBuilder.bind(queue).to(exchange).with("delete").noargs() ; }Copy the code
2. Set the dead letter queue as a normal queue property
@bean (name = "direct.queue_02") public Queue commonQueue02() {QueueBuilder QueueBuilder = QueueBuilder.durable("direct.queue_02"); queueBuilder.deadLetterExchange("dlx.exchange") ; / / the dead-letter switches For the common attributes of the queue set the past queueBuilder. DeadLetterRoutingKey (" delete "); Queuebuilder.ttl (30000); queueBuilder.ttl(30000); Queuebuilder.maxlength (2); queueBuilder.maxlength (2); Return queueBuilder.build(); return queueBuilder.build(); }Copy the code
3. Do the same on the consumer side and specify the dead-letter queue
@Component public class RabbitmqDlxQueueConsumer{private static final Logger Logger = LoggerFactory.getLogger(RabbitmqDlxQueueConsumer.class) ; @RabbitListener(queues = "dlx.queue") public void dlxQueueConsumer(String msg) { LOGGER.info("dlx queue msg is : {} ", msg); }}Copy the code
Priority queue
Higher-priority messages can be consumed first, but only if messages are stacked (messages are sent faster than consumers can consume them).
Map<String, Object> argss = new HashMap<String, Object>(); argss.put("x-max-priority",10); Channel. queueDeclare("ORIGIN_QUEUE", false, false, false, argss);Copy the code
Delays in the queue
RabbitMQ itself does not support delay queuing. The delay delivery of messages can be realized by using TTL and DLX, that is, DLX is bound to a queue. When the message expires at a specified time, it will be routed from DLX to this queue, and consumers can take the message from this queue. The alternative is to use the Rabbitmq-delayed -message-exchange plug-in. Of course, it is also possible to save the information to be sent in a database, scan it using a task scheduling system and then send it.
Traffic limiting on the server
With AutoACK false, new messages are not consumed until a certain number of messages have been acknowledged (by setting Qos values based on consumer or channel).
channel.basicQos(2); BasicConsume (QUEUE_NAME, false, consumer); // If no ACK is sent for more than two messages, the current consumer no longer accepts queue messages channel.basicConsume(QUEUE_NAME, false, consumer);Copy the code
09 How to Ensure RabbitMQ Reliability
First of all, it should be made clear that efficiency and reliability cannot be achieved at the same time. To ensure the success of every link, the efficiency of sending and receiving messages will be affected. If the business implementation consistency requirement is not particularly high, some reliability can be sacrificed for efficiency.
① Represents the message sent from the producer to the Exchange. ② Indicates that messages are routed from Exchange to Queue. ③ Indicates that messages are stored in the Queue. ④ Subscribes to the Queue and consumes messages on behalf of the consumer.
9.1 Ensuring that Messages are Sent to the RabbitMQ Server
It may be a network or Broker problem that causes a failure, and the producer has no way of knowing whether the message was sent correctly to the Broker.
There are two solutions, Transaction mode and Confirm mode.
After the transaction is enabled via channel.txSelect, we can publish a message to RabbitMQ. If the transaction is committed successfully, the message must reach RabbitMQ. If RabbitMQ crashes or throws an exception before the transaction is committed, At this point we can capture it and roll back the transaction by executing channel.txrollback. Transactions suck RabbitMQ performance and are not recommended.
The producer sets the channel to Confirm mode by calling channel.confirmSelect (that is, the confirm. Select command). Once the message has been sent to all the matching queues, RabbitMQ sends an acknowledgement (basic.ack) to the producer (containing the unique ID of the message), which lets the producer know that the message has arrived at its destination correctly.
9.2 Ensuring that messages are routed to the correct queue
② may fail because the route keyword is incorrect, the queue does not exist, or the queue name is incorrect. The mandatory parameters and ReturnListener enable messages to be returned to the producer if they cannot be routed. Another option is to use alternate-exchange, to which unroutable messages are sent.
9.3 Ensure that messages are stored correctly in queues
Messages stored in queues may be lost due to system downtime, restart, or shutdown. That is, ③ is faulty.
1, do queue, switch, message persistence.
2, do cluster, mirror queue.
To change the default configuration, you can create a rabbitmq.config file in the /etc/rabbitmq/ directory and specify the configuration information according to the specified JSON rules. As follows:
[{
rabbit,
[
{
queue_index_embed_msgs_below,
4097
}
]
}].
Copy the code
Then restart the RabbitMQ service (rabbitmqctl stop—-> rabbitmq-server-detached). Should we set the queue_index_embed_MSgs_below parameter as large as possible? Rabbit_queue_index is stored in sequential segment files with the suffix “.idx”, each containing the fixed SEGMENT_ENTRY_COUNT entries, The default value of SEGMENT_ENTRY_COUNT is 16384. Rabbit_queue_index maintains at least one segment file in memory for each rabbit_queue_index read from disk, so be careful when setting queue_index_embed_MSgs_below. Small increases can cause memory to explode.
Related: Message storage mechanism
Both persistent and non-persistent messages can be written to disk. 1. Persistent messages are written to disk as they reach the queue, and if possible, a backup of persistent messages is kept in memory, which improves performance and is flushed from memory when memory becomes tight. 2. Non-persistent messages are generally stored only in memory and are written to disk when memory is tight to save memory space. The processing of both types of messages is done in RabbitmqMQ’s “persistence layer”. The composition of the persistence layer is as follows:
Rabbit_queue_index: Information responsible for maintaining a drop message in a queue, including where the message is stored, whether it has been delivered to the consumer, and whether it has been ack by the consumer. Each queue has a rabbitMQ_queue_index corresponding to it. Rabbit_msg_store: Responsible for storing messages, shared by all queues, one on each node. Rabbit_msg_store can be subdivided:
Msg_store_persisent: Is responsible for the persistence of persistent messages, which are not lost during restart
Msg_store_transient: is responsible for the persistence of non-persistent messages, which will be lost after restart
Messages can be stored in rabbit_queue_index or rabbit_MSG_store. The best configuration is for smaller messages to be stored in rabbit_queue_index and larger messages to be stored in rabbit_MSG_store. The message demarcation can be configured with queue_index_Embed_MSgs_below, which has a default size of 4096 in units B. Note that the message size here refers to the size of the message body, attribute, and HEADERS as a whole. When a message is less than the specified size threshold it can be stored in rabbit_queue_index for performance optimization. This storage mechanism was introduced after Rabbitmq3.5 and this optimization improves system performance by around 10%.
Related: Queue structure
Rabbitmq queues consist of two parts: rabbit_amqpqueue_process and backing_queue:
Rabbit_amqpqueue_process: handles protocol-related messages, such as receiving messages from producers, delivering messages to consumers, and processing confirmation messages (including confirms on the production end and ACK on the consumer end). Backing_queue: Is the concrete form and engine of the message store and provides interfaces to rabbit_amqpqueue_process to call.
If the queue to which a message is sent is empty and there are consumers in the queue, the message will not pass through the queue but will be sent directly to the consumer. If it cannot be consumed directly, the message needs to be temporarily queued for re-delivery.
After a message is queued, it has the following states: alpha: The message content (including the message body, attribute, and headers) and the message index are stored in memory (consuming the most memory and the least CPU consumption) beta: The message contents are stored on disk and the message indexes are stored in memory (only one IO operation is required to read the message) gamma: The message contents are stored on disk and the message indexes are stored in disk and memory (only one IO operation is required to read the message) delta: Message content and message index are persisted on disk (which consumes the least memory, but consumes more CPU and DISK IO operations). Message content and message index must be stored on disk before they can be in one of the above states. The Gamma state is only available for persistent messages. Rabbitmq will periodically calculate the maximum number of messages that can be held in memory at runtime based on the measured message delivery speed (target_ram_count). If the number of alpha messages is greater than this value, a message state transition will occur. Redundant messages may transition to a beta, gamma, or Delta state. The main purpose of distinguishing these four states is to meet different memory and CPU requirements.
For a normal queue, the internal implementation of backing_queue reflects the status of messages through five subqueues: Q1: messages with alpha status only Q2: messages with beta and gamma Delta: messages with Delta Q3: messages with Delta Messages with beta and Gamma Q4: Only messages with alpha status
Typically, messages flow in the order Q1->Q2->Delta->Q3->Q4, but not every message experiences all states, depending on the load on the current system (for example, non-persistent messages do not experience deltas when memory load is low). The advantage of this design is that in the case of high queue load, memory space can be saved by saving part of the messages to disk, and when the load is reduced, this part of the messages will gradually return to memory to be retrieved by consumers, making the whole queue has good elasticity.
Related: State transitions when consuming messages
Consumer consumption of messages also causes a transition of message state, as shown below:
-
The consumer retrieves the message from Q4 and returns it on success.
-
If Q4 is empty, the message is obtained from Q3, and the first judgment is whether Q3 is empty. If it is empty, the queue is returned to be empty, that is, there is no elimination in the queue at this time
-
If Q3 is not empty, fetch the message from Q3 and judge the length in Q3 and Delta. If both are empty, then Q2, Delta, Q3 and Q4 are empty. Transfer the message from Q1 to Q4 directly and read the message from Q4 directly next time
-
If Q3 is empty and Delta is not, the message in Delta is moved to Q3 and read directly from Q3 next time.
-
In the process of transferring message from Delta to Q3, read is according to the index fragmentation, first read a paragraph, and then judge read the number of messages and Delta, if equal, determine the Delta has no message, directly will read the Q2 and read news in Q3, along with all if not equal, only to read the message transfer to Q3.
Usually under normal load, if messages are consumed at a rate no faster than new messages are received, messages that do not require reliability will most likely only be in alpha state. Durable messages with the true attribute enter the Gamma state. When Publisher Confirm is enabled, the message is received only when the Gamma state is reached. If the message consumption speed is fast and the memory is sufficient, the message will not continue to the next state.
At high levels of system load Rabbitmq will spend more time and resources processing “piled” messages if they are not consumed quickly and will be less able to handle incoming messages. The incoming messages are “piled up” further increasing the average cost of processing each message, and the situation gets worse and worse, reducing the system’s processing capacity. Common solutions to reduce message stacking are: 1. Increase the value of preFETch_count to set the maximum number of unacknowledged messages that the consumer can store. 2
Related: Lazy queues
By default, when a producer sends a message to Rabbitmq, the message in the queue is stored in memory as much as possible so that the message can be sent to the consumer more quickly. Even persistent messages have a backup in memory as they are written to disk. Such a mechanism takes up more system resources, after all, memory should be reserved for more places where it is needed. If the sending end is too fast or the consuming end is down, resulting in a large backlog of messages, the messages are still stored in memory and disk, in the event of a message outbreak, MQ server will not hold up, affecting other queues to send and receive messages, can effectively handle this situation? Answer lazy queue. RabbitMQ introduced the concept of Lazy queues starting with version 3.6.0. Inert queue receives the message will be directly deposited in the file system, and whether it is persistent or is persistent, which reduces the memory consumption, but will increase the use of I / 0, if the message is persistent, then the I / 0 operations is inevitable, inert queue and persistent message is “best partner”. Note that if non-persistent messages are stored in an inert queue, memory usage remains stable, but messages are still lost after a restart. How to set a queue to lazy:
@bean (name = "direct.queue_03") public Queue commonQueue03() {QueueBuilder QueueBuilder = QueueBuilder.durable("direct.queue_03"); queueBuilder.lazy(); Queuebuilder.build (); queueBuilder.build(); }Copy the code
9.4 Ensure that messages are delivered correctly from the queue to the consumer
If an exception occurs before the consumer processes the message or during processing, a (4) failure occurs.
To ensure that messages reliably reach consumers from queues, RabbitMQ provides message acknowledgement mechanisms. Consumers can subscribe to the queue by specifying the autoAck parameter, and when autoAck is false RabbitMQ will wait for an explicit acknowledgement from the consumer before removing the message from the queue.
If message consumption fails, basic. Reject or basic. Nack can also be called to Reject the current message rather than acknowledge it. If the requeue parameter is set to true, the message can be re-queued for delivery to the next consumer (of course, if there is only one consumer, this can lead to an infinite loop of repeated consumption, be posted to a new queue, or just print the exception log).
9.5 Compensation Mechanism
For messages that do not receive any response within a certain period of time, you can set a mechanism for resending messages periodically. However, the number of times should be controlled, such as a maximum of three times. Otherwise, messages will accumulate.
9.6 Message idempotency
There is no such control on the server side, only on the consumer side. How to avoid repeated consumption of messages? There are two possible causes for duplicate messages: 1. The producer sends messages repeatedly. For example, the Confirm mode is enabled but no confirmation is received. 2. There is something wrong with link ④. Because consumers have not sent ACK or other reasons, the message is delivered repeatedly. For messages that are sent repeatedly, a unique service ID can be generated for each message and repeated control can be achieved through logging or table building.
9.7 Sequential nature of messages
Message sequentiality means that the order in which the consumer consumes the message is the same as the order in which the producer produces the message. In RabbitMQ, when there are multiple consumers in a queue, the order is not guaranteed because different consumers consume messages at different rates.
10 How can RabbitMQ ensure high availability
10.1 RabbittMQ cluster
Clusters are mainly used for high availability and load balancing. RabbitMQ uses /var/lib/r abbitmq/.erlang. cookie to authenticate users, which must be consistent on all nodes. A cluster has two types of nodes: disk nodes and memory nodes. The cluster requires at least one disk node for metadata persistence. If the type is not specified, the disk node is used by default. The cluster communicates through port 25672 in pairs. You need to enable the port on the firewall. Note that RabbitMQ clusters cannot be built on wans unless they are fed or shovelled. Perform the following steps to configure a cluster: 1. Configure hosts. 2
The cluster structures,
Docker pull rabbitMQ :3.6.10-managementdocker run-di --network=docker-network -- IP =172.19.0.50 --hostname=rabbitmq-node01 --name=rabbitmq_01 -p 15673:15672 -p 5673:5672 --privileged=true -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' RabbitMQ :3.6.10-management /bin/bash docker run-di --network=docker-network -- IP =172.19.0.51 --hostname=rabbitmq-node02 --name=rabbitmq_02 -p 15674:15672-p 5674:5672 -- Privileged = true-e RABBITMQ_ERLANG_COOKIE='rabbitcookie' RabbitMQ :3.6.10-management /bin/bash docker run-di --network=docker-network -- IP =172.19.0.52 --hostname=rabbitmq-node03 --name=rabbitmq_03 -p 15675:15672-p 5675:5672 -- Privileged = true-e RABBITMQ_ERLANG_COOKIE = 'rabbitcookie rabbitmq: 3.6.10 - management/bin/bashCopy the code
The Erlang Cookie values must be the same, that is, the RABBITMQ_ERLANG_COOKIE values must be the same. Because RabbitMQ is implemented using Erlang cookies, which are the secret keys used by different nodes to communicate with each other, Erlang nodes are authenticated by exchanging Erlang cookies.
docker exec -itrabbitmq_01 /bin/bash
Copy the code
Configure the hosts file so that each node can identify each other. Add the mapping between IP address and node name in /etc/hosts file (apt-get update, apt-get install vim) :
172.19.0.50 rabbitmq-node01
172.19.0.51 rabbitmq-node02
172.19.0.52 rabbitmq-node03
Copy the code
Start RabbitMQ and view the status
root@014faa4cba72:/# rabbitmq-server-detached # start rabbitMQ service, This command is used to start the Erlang VM and rabbitmq service root@014faa4cba72:/# rabbitmqctl status # Check node information status of noderabbit@014faa4cba72 [{pid,270}, {running_applications, [{rabbitmq_management "RabbitMQ Management Console","3.6.10"}, {rabbitmq_management_agent, "the RabbitMQ Management Agent," "3.6.10"}, {rabbitmq_web_dispatch, "the RabbitMQ Web Dispatcher", "3.6.10"}, ............. root@014faa4cba72:/# rabbitmqctl cluster_status # Check the Cluster node status Cluster status of noderabbit@014faa4cba72 [{nodes,[{disc,[rabbit@014faa4cba72]}]}, {running_nodes,[rabbit@014faa4cba72]}, {cluster_name,<<"rabbit@014faa4cba72">>}, {partitions,[]}, {Alarms,[{rabbit@014faa4cba72,[]}]}]Copy the code
Note: We can access rabbitMQ’s back-end management system from a browser, but rabbitMQ’s default guest user does not support remote access. So we need to create users and authorize them
root@014faa4cba72:/# rabbitmqctl add_user admin admin Admin root@014faa4cba72:/# rabbitmqctl list_users # check the list of rabbitmq users. Listing users admin [] # Guest [administrator] root@014faa4cba72:/# rabbitmqctl set_user_tags admin administrator # Rabbitmqctl delete_user admin # delete admin user # rabbitmqctl stop_app # stop rabbitmq service # rabbitmqctl stop # The RabbitMQ service will be shut down along with the Erlang vmCopy the code
You can log in to the web management system as the admin user again. Users are also created in other RabbitMQ systems so that they can later access the back-end management system.
Configure the cluster
1. Synchronize cookies The Rabbitmq nodes in the cluster need to exchange key tokens to authenticate each other. If the key tokens are inconsistent, an error will be reported during node configuration. To obtain a certain node/var/lib/rabbitmq /. Erlang. The cookie file, and then copy it to the other nodes. We do this using node01 as a baseline.
docker cprabbitmq_01:/var/lib/rabbitmq/.erlang.cookie .
docker cp.erlang.cookie rabbitmq_02:/var/lib/rabbitmq
docker cp.erlang.cookie rabbitmq_03:/var/lib/rabbitmq
Copy the code
2. Establish cluster relationship At present, the three nodes run independently without any association between them. To establish the relationship between the three nodes, we add the other two nodes to the rabbitmq-node01 base. Add rabbitmq-node02 to node 1
Rabbitmqctl stop_app stop the rabbitMQ service rabbitmqctl reset reset rabbitmqctl join_cluster rabbit@rabbitmq-node01 # rabbitmq-node01 specifies the host name of node 1. Rabbitmqctl start_appCopy the code
Add rabbitmq-node03 to node 1
Rabbitmqctl stop_app # Stop the rabbitMQ service rabbitmqctl reset # Clear the node state and restore it to the blank state when setting the node to be part of the cluster. This command also communicates with disk nodes in the cluster to tell them that the node is leaving the cluster. Otherwise the cluster will assume that the node handled the fault, Rabbitmqctl join_cluster rabbit@rabbitmq-node01 # rabbitmq-node01 is the host name of node 1 rabbitmqctl start_app # Start the RabbitMQ nodeCopy the code
Access the background management system to view the cluster overview.
The node type
Node type This section describes how to use rabbitmqctl The cluster_status command displays a [{nodes,[{disc[‘rabbit@rabbitmqnode01′,’rabbit@rabbitmq-node02′,’rabbit@rabbitmq-node03’]} letter to view the cluster status Each node in Rabbitmq, whether it is a single node system or part of a cluster, is either a memory node or a disk node. Metadata definitions for binding relationships, users, permissions and vhosts are stored in memory and disk nodes store this information to disk. A single-node cluster must have only disk nodes, otherwise all system configuration information will be lost when Rabbitmq is restarted. You can choose to configure some nodes as memory nodes for higher performance.
Node type change If we do not specify the node type, the default is disk node. When adding a node, you can use the following command to specify the type of the node as memory node:
rabbitmqctl join_cluster rabbit@rabbitmq-node01 --ram
Copy the code
We can also use the following command to set a disk node as a memory node:
rabbitmqctl change_cluster_node_type {disc , ram}
Copy the code
As shown below.
root@rabbitmq-node02:/# rabbitmqctl stop_app # stop rabbitmq service Stopping rabbit Application on node 'rabbit@rabbitmq-node02' root@rabbitmq-node02:/# rabbitmqctl change_cluster_node_type ram # Change the node type of root@rabbitmq-node02 to memory node Turning 'rabbit@rabbitmq-node02'into a RAM node root@rabbitmq-node02:/# rabbitmqctl start_app # Start rabbitMQ service 'rabbit@rabbitmq-node02' root@rabbitmq-node02:/# rabbitmqctl cluster_status # Query Cluster status Cluster status of node 'rabbit@rabbitmq-node02' [{nodes,[{disc,['rabbit@rabbitmq-node03','rabbit@rabbitmq-node01']}, {ram,['rabbit@rabbitmq-node02']}]}, {running_nodes,['rabbit@rabbitmq-node01','rabbit@rabbitmq-node03', 'rabbit@rabbitmq-node02']}, {cluster_name,<<"rabbit@rabbitmq-node01">>}, {partitions,[]}, {alarms,[{'rabbit@rabbitmq-node01',[]}, {'rabbit@rabbitmq-node03',[]}, {'rabbit@rabbitmq-node02',[]}]}] root@rabbitmq-node02:/#Copy the code
Node selection
Rabbitmq requires only one disk node in the cluster, all other nodes can be memory nodes. When nodes join or leave the cluster, they must notify changes to at least one disk node. If there is only one disk node and it happens to crash, the cluster can continue receiving and sending messages.
However, you cannot create queues, switch, bind relationships, users have changed permissions, add or delete cluster nodes. That is, if the only disk node in the cluster crashes, the cluster can still run, but you can’t change anything until the node is restored to the cluster, so make sure there are at least two or more disk nodes when creating the cluster.
When the memory node is restarted, it connects to the preconfigured disk node to download a copy of the current cluster metadata. When adding memory nodes to a cluster, be sure to inform all disk nodes (the only metadata information stored in the memory node to disk is the address of the disk node). As long as the memory node can find at least one disk node in the cluster, it can rejoin the cluster after a restart.
10.2 Cluster optimization :HAproxy load +Keepalived high availability
The existing cluster has the following problems: It does not have the load balancing capability
The software of the load balancing layer we choose this time is HAProxy. In order to ensure the high availability of the load balancing layer, we need to use keepalived software and use VRRP protocol to generate virtual IP to achieve dynamic IP floating.
Keepalived is based on Virtual Router Redundancy Protocol (VRRP), which stands for Virtual Router Redundancy Protocol. Virtual route redundancy protocol can be considered as a protocol to realize router high availability. That is, N routers providing the same function form a router group, in which there is a master and multiple backup. The master has a VIP that provides services (the default route is this VIP for other machines on the LAN where the router resides). The master sends VRRP packets to the backup server. If the backup server cannot receive the VRRP packets, the master considers that the backup server has broken down. In this case, you need to elect a backup as the master based on the VRRP priority. This ensures that the router is highly available.Copy the code
To optimize the implementation
Install HAProxy on both memory nodes
yum install haproxy
Copy the code
Editing a Configuration File
CFG global log 127.0.0.1 local2 chroot /var/lib/haproxy pidfile /var/run/haproxy.pid maxconn 4000 user haproxy group haproxy daemon stats socket /var/lib/haproxy/stats defaults log global option dontlognull option redispatch retries 3 timeout connect 10s timeout client 1m timeout server 1m maxconn 3000 listen http_front mode http Bind 0.0.0.0:1080 # stats refresh 30s # stats URI /haproxy Stats auth admin:123456 # rabbitmq_admin Bind 0.0.0.0:15673 Server node1 192.168.8.45:15672 Server node2 192.168.8.45:15672 listen rabbitmq_cluster 0.0.0.0:5673 Mode TCP Balance Roundrobin Timeout Client 3h timeout Server 3h timeout Connect 3H server node1 192.168.8.40:5672 Check Inter 5S Rise 2 Fall 3 Server Node2 192.168.8.45:5672 Check Inter 5S Rise 2 Fall 3Copy the code
Start the HAproxy
haproxy -f /etc/haproxy/haproxy.cfg
Copy the code
Install keepalived
yum -y install keepalived
Copy the code
Modifying a Configuration File
vim /etc/keepalived/keepalived.conf global_defs { notification_email { [email protected] [email protected] [email protected]} notification_email_from [email protected] smtp_server 192.168.200.1 30 Router_id LVS_DEVEL vrrp_skip_check_adv_addr # vrrp_strict # Otherwise, VIP vrrp_garp_interval 0 vrrp_gna_interval 0} global_defs {notification_email {[email protected] [email protected] [email protected] } notification_email_from [email protected] smtp_server 192.168.200.1 smtp_connect_timeout 30 Router_id LVS_DEVEL vrrp_skip_check_adv_addr # vrrp_strict # Vrrp_script check_haproxy {vrrp_script check_haproxy {vrrp_script check_haproxy "/ etc/keepalived/script/check_haproxy. Sh" # test every two seconds interval weight weight 2 # # 2} virtual group vrrp_instance haproxy {state MASTER McAst_src_ip 192.168.8.40 # Current host IP address virtual_Router_id 51 # Virtual route ID, Advert_int 1 # Heartbeat check frequency, unit: Auth_type PASS auth_pass 1111} # track_script {check_haproxy} # virtual IP, Virtual_ipaddress {192.168.8.201}}Copy the code
Start the
keepalived -D
Copy the code
10.3 RabbitMQ Mirror Queue
To ensure the high availability of queues and messages. 2. What is a mirror queue and how does a mirror queue select a master node? A mirrored queue mechanism is introduced to mirror the queue to other Broker nodes in the cluster. If a node in the cluster fails, the queue can be automatically switched to another node in the mirror to ensure service availability. In common usage, each queue configured with a mirror (called a mirror queue) contains a master node and several slaves, as shown in the following figure:
In cluster mode, queues and messages cannot be synchronized between nodes, so you need to use the mirrored queue mechanism of RabbitMQ for synchronization.
Insight into the reference articles: blog.csdn.net/u013256816/…
11 RabbitMQ applications
I work for an e-commerce company and will explain the practice of RabbitMQ in the context of an e-commerce seconds kill scenario.
11.1 Scenario: Unpaid orders and inventory Rollback
When the user succeeds, it is necessary to guide the user to the order page for payment. If the user does not complete the payment of the order within the specified time (30 minutes), we need to roll back the inventory.
Architecture diagram
The implementation is to use a dead-letter queue, as shown in the code above.
11.2 Scenario: Fairness Guarantee for RabbitMQ
The reliability of message transmission ensures the fairness of the second kill service. One more thing to consider about the fairness of seckill services is the orderliness of messages (the messages that are queued first are processed first)
RabbitMQ message sequence description
Sequentiality: Message sequentiality means that the message consumed by the consumer is in the same order as the message published by the sender. For example, if a producer publishes MSGL, MSG2, and MSG3 messages, regardless of message duplication, then consumers must consume MSGL, MSG2, and MSG3 in the same order.
There is a lot of evidence to suggest that RabbitMQ messages can be ordered, but this is not true, or the idea is very limited. Ordering of messages can be guaranteed without any of RabbitMQ’s advanced features, with no exceptions such as message loss or network failure, and with only one consumer and only one producer. If multiple producers send messages at the same time, there is no way to determine the order in which messages arrive at the Broker, and therefore no way to verify the ordering of messages, since each message is sent in its own thread.
RabbitMQ message ordering disorder demonstration
The producer sends a message: 1, do not use the producers confirmed mechanism, single producers and consumers can ensure the order of message 2, use the producers confirmed mechanism, then it cannot assure the message to the Broker sequence, because the message is sent asynchronously, each thread of execution time, the production end use different 3 transaction mechanism, ensure the order of the message
A single consumer can guarantee the order of messages. 2. Multiple consumers cannot guarantee the order of messages, because each message is consumed in its own thread and each thread has different execution time
RabbitMQ message ordering ensures that transactions are enabled on the production end and single producer and single consumer are enabled. If we do not consider the order in which messages arrive at MQ and only consider the order consumption of messages already arriving at MQ, we need to ensure that the consumer is a single consumer.
11.3 Scenario: The RabbitMQ Service does not oversold in seconds
To ensure that the second kill is not oversold, we need to consider in many links. For example, when withholding inventory, we need to consider not oversold, and when deducting real inventory in the database, we also need to consider not oversold. For our MQ segment, to ensure that the message is not oversold we simply need to ensure that the message is not re-consumed.
The first thing we can confirm is that the conditions that trigger the repeated execution of the message can be very harsh! This condition is not triggered in most scenarios. Generally, if the task times out or does not return to the status in time, the task is re-queued. Because the server does not receive the ACK response from the consumer, the message is re-delivered.
Idempotent guarantee scheme
Repeated consumption is not terrible, what is terrible is that you do not consider how to ensure idempotency after repeated consumption. Idempotent means that multiple calls to an interface produce the same result as one call. In layman’s terms, a single piece of data, or a single request, is given to you over and over again, and you have to make sure that the corresponding piece of data doesn’t change. For example, suppose you have a system that consumes one message and inserts one piece of data into the database. If you consume the same message twice, you insert two pieces of data and the data is wrong. But if you consume to the second time, their own judgment whether it has been consumed, if directly thrown away, so not retain a data, so as to ensure the correctness of the data. Once a data is consumed twice, there is only one data in the database, which ensures the idempotency of the system. How can message queue consumption be idempotent? This requires a combination of the actual business to deal with: 1, the data such as the us consumer need to write the database, you first according to the primary key check, if have this data, you don’t insert, perform the following update 2, we need write Redis consumption data, for example, that’s no problem, anyway, every time is set, the natural idempotence. 3. For example, if you are not in the above two scenarios, it is a little more complicated. You need to ask the producer to add a globally unique ID, such as order ID, when sending each data. If it hasn’t been consumed, you process it, and then that ID is written Redis. If you consume too much, don’t process it. Just make sure you don’t process the same message twice. 4, for example, based on the database unique key to ensure that repeated data will not be repeatedly inserted multiple. Because of the unique key constraint, duplicate inserts only generate errors and do not cause dirty data in the database.
12 the interview questions
-
1. What are the functions and usage scenarios of message queues?
-
2. How to create queues and switches?
-
3. How are messages distributed when multiple consumers listen to a producer?
-
4. Where do messages that cannot be routed go?
-
5. When does a message become a Dead Letter?
-
6. How does RabbitMQ implement delay queuing?
-
7. How to ensure the reliability of message delivery?
-
8. How to limit traffic on the server and consumer?
-
9. How to ensure the sequential nature of messages?
-
10. Node type for RabbitMQ?
-
11、…