An overview of
1.1 background
Recently doing alarm integration platform, which need the alarm message is sent, the receiver types need to cover the market mainstream news, such as WeChat WeChat/enterprise/nailing/email/SMS/phone and so on, this is bound to use the MQ, in numerous message middleware, researched the scene is not as big data processing scenario requires kafka, High performance and validation mechanisms, data reliability and an active community were required to support message persistence and high availability deployment of middleware, and RabbitMQ was selected as the middleware for the application.
1.2 concept
MQ stands for Message Queue. MQ is an application-to-application communication method. Applications communicate by reading and writing messages in and out of queues (data for the application) without the need for dedicated connections to link them. Messaging refers to programs communicating with each other by sending data in messages rather than by direct calls, which are typically used for techniques such as remote procedure calls. Queuing refers to applications communicating through queues. The use of queues removes the need for receiving and sending applications to execute simultaneously. RabbitMQ is a complete, reusable enterprise messaging system based on AMQP.
1.3 features
- Application decoupling: MQ is based on the interface layer of the data, breaking up the coupled application and implementing the interface on both sides, thus allowing independent modification or extension of the processes on both sides, as long as both sides adhere to the same interface constraints.
- Peak traffic: In high-concurrency, high-traffic scenarios, RabbitMQ can reduce the burst of access pressure without crashing due to sudden time-out demands
- Asynchronous communication: Asynchronous processing of real-time business by sending messages to the messaging middleware
1.4 the characteristics of
- Reliability :RabbitMQ uses mechanisms to ensure reliability such as persistence, transport confirmation, and publication confirmation.
- Flexible routing: Messages are routed through the exchange before they are queued. For typical routing, RabbitMQ already provides some built-in switches. For more complex routing functions, multiple switches can be bound together, or you can implement your own switch through a plug-in mechanism.
- Scalability: Multiple RabbitMQ nodes can form a cluster or dynamically expand the cluster based on service requirements.
- High availability: Queues can be mirrored on machines in the cluster, making them available even if some nodes fail.
- Multiple protocols :RabbitMQ supports multiple messaging middleware protocols, including STOMP and MQTT, in addition to AMQP natively.
- Multi-language clients :RabbitMQ supports almost all common languages, such as Java, Python, Ruby, PHP, C#, JavaScript, etc.
- Administration Interface :RabbitMQ provides an easy-to-use user interface that allows users to monitor and manage messages, nodes in a cluster, etc.
- Plugin mechanisms :RabbitMQ provides many plug-ins to extend in many ways, but you can also write your own.
The second architecture
2.1 architecture diagram
- RabbitMQ Server
Also called Broker Server, it is not a food truck, but a delivery service. RabbitMQ isn’t a food truck, it’s a delivery service. Its role is to maintain a path from Producer to Consumer, ensuring that data is transmitted in a specified way. Although this guarantee is not 100% guaranteed, it is sufficient for common applications. Of course, for commercial systems, another layer of data consistency guard can be made to completely ensure system consistency.
- Client P
Also known as Producer, the sender of data. Create messages and publish (send) them to a Broker Server (RabbitMQ). A Message has two parts: payload and label. Payload is the data that is transmitted. The label is the name of the exchange, or a tag, that describes the payload, and it is the label that RabbitMQ uses to decide which Consumer to send the Message to. AMQP only describes the label, and RabbitMQ determines the rules for how to use the label.
- Client C
Also called Consumer, the receiver of data. Consumers attach to a Broker Server (RabbitMQ) and subscribe to a queue. Think of a queue as a mailbox with a name. When a Message arrives in a mailbox, RabbitMQ sends it to one of its subscribers, the Consumer. Of course, the same Message could be sent to many consumers. So in this Message, there’s only payload, and the label has been deleted. For the Consumer, it does not know who sent the message, but the protocol itself does not support it. Of course, if the Producer sends a payload that contains information from the Producer, that’s a different story.
- Connection
It’s a TCP connection. Both producers and consumers connect to the RabbitMQ Server over TCP. As we will see later, the program starts by establishing this TCP connection.
- Channel
Virtual connection. It is built over the TCP connection described above. All data flows are in channels. In other words, the normal situation is that the program starts to establish a TCP connection, and the second step is to establish this Channel.
So why use Channel instead of using TCP connections directly?
Establishing and closing TCP connections is costly for the OS. Frequent establishing and closing TCP connections has a significant impact on the system performance, and the number of TCP connections is also limited, which limits the system’s ability to handle high concurrency. However, setting up a Channel in a TCP connection does not incur these costs. A Producer or Consumer can simultaneously Publish or Receive multiple channels. Experiments show that 1s data can Publish10K data packets. Of course, this figure is different for different hardware environments and different packet sizes, but I just want to point out that this is enough for ordinary consumers or producers. If not, you should be thinking about how to SPLIT your design.
2.2 Related Definitions
- Broker: Simple message queue server entity
- Exchange: message Exchange, which specifies the rules by which messages are routed to which queue
- Queue: Message Queue carrier, each message is put to one or more queues
- Binding exchanges and queues according to routing rules
- Routing Key: The Key used by Exchange to deliver messages
- VHost: Virtual host. Multiple vhosts can be set up within a broker to separate permissions between different users.
- Producer: A program that delivers messages
- A Consumer is a program that receives messages
- Channel: Message Channel. In each connection of the client, multiple channels can be established. Each Channel represents a session task
Exchange, Queue, and RoutingKey determine a unique route from Exchange to Queue.
2.3 Basic Concepts
A Connection Factory, a Connection, and a Channel are all basic objects in the RabbitMQ API. Connection is the RabbitMQ socket Connection, which encapsulates some of the socket protocol logic. Connection Factory is the manufacturing Factory for Connection.
Channel is the most important interface we use with RabbitMQ. Most of our business operations are done in Channel, including defining queues, defining exchanges, binding queues to exchanges, and publishing messages.
- Queue
A Queue is RabbitMQ’s internal object for storing messages, as shown below.
Messages in RabbitMQ can only be stored in a Queue, where the producer (P in the image below) produces the message and delivers it to the Queue, and the consumer (C in the image below) gets the message from the Queue and consumes it.
- Message acknowledgment
In practice, messages could be lost if consumers receive messages from the Queue but fail (or otherwise fail) without processing them. To avoid this, we can ask the consumer to send a acknowledgment to RabbitMQ after consuming the Message, and RabbitMQ should wait until it receives the Message Acknowledgment to remove the Message from the Queue.
If RabbitMQ does not receive a receipt and detects that the consumer’s RabbitMQ connection is down, RabbitMQ will send the message to other consumers (if there are more than one) for processing. There is no timeout and no amount of time a consumer takes to process a message will cause it to be sent to another consumer unless its RabbitMQ connection is disconnected.
Another problem here is that if our developers forget to send the receipt to RabbitMQ after processing the business logic, this will cause a serious bug – the Queue will accumulate more and more messages. After the consumer restarts, the messages are repeatedly consumed and the business logic is repeatedly executed.
Publish message does not have ACK.
- Message durability
If we want to avoid messages being lost even when the RabbitMQ service is restarted, we can make both queues and messages durable to guarantee that RabbitMQ messages will not be lost in most cases. This does not resolve the occurrence of a small loss event (such as a RabbitMQ server power failure after the RabbitMQ server has received a producer’s message before it can persist the message), and if we need to manage such a small loss event, we need to use transactions. Since this is only a brief introduction to RabbitMQ, rabbitMQ-related transactions will not be covered here.
- Prefetch count
Earlier we said that if multiple consumers subscribe to messages in the same Queue, the messages in the Queue will be spread across multiple consumers. In this case, if the processing time of each message is different, it is possible that some consumers will be busy all the time, while others will quickly finish their work and remain idle. We can set prefetchCount to limit the number of messages a Queue can send to each consumer. For example, if prefetchCount=1, the Queue can send one message to each consumer at a time. After the consumer processes the message, the Queue sends another message to the consumer.
- Exchange
We saw in the previous section that producers post messages to queues, which in fact never happens in RabbitMQ. In reality, the producer sends the message to the Exchange (the Exchange, X in the figure below), which routes the message to (or drops it) in one or more queues.
What logic does Exchange use to route messages to the Queue? This is covered in the Binding section.
There are four Types of exchanges in RabbitMQ, each of which has a different routing policy, as described in the Exchange Types section.
- Routing Key
When a producer sends a message to Exchange, it usually specifies a Routing Key to specify the Routing rule for the message. The Routing Key must be used together with the Exchange Type and Binding Key to take effect.
With the Exchange Type and Binding key fixed (which is usually fixed in normal use), our producer can determine where the message will go by specifying the Routing key when sending a message to the Exchange.
The RabbitMQ Routing Key length is limited to 255 bytes.
- Binding
A Binding binds exchanges to queues so RabbitMQ knows how to route messages to the Queue correctly.
- Binding key
When an Exchange and Queue are bound, a Binding key is usually specified. When a consumer sends a message to an Exchange, it typically specifies a Routing Key. When the Binding key matches the Routing key, the message is routed to the corresponding Queue. This will be illustrated with practical examples in the Exchange Types section.
These bindings allow the same Binding key to be used when Binding multiple queues to the same Exchange.
The Binding key does not work in all cases. It depends on the Exchange Type. For example, an Exchange of Type FANout will ignore the Binding key and route messages to all queues bound to the Exchange.
- Exchange Types
The main Exchange types for RabbitMQ are FANout, Direct, Topic, and headers. The AMQP specification also mentions system and custom Exchange types.
- fanout
The Exchange routing rule of type FANout is very simple and routes all messages sent to the Exchange to all queues bound to it.
In the figure above, all messages sent by the producer (P) to Exchange (X) are routed to the two queues in the figure and are eventually consumed by the two consumers (C1 and C2).
- direct
Exchange Routing rules of the direct type are also simple, Routing messages to queues whose Binding keys exactly match Routing keys.
For example, if we send a message to Exchange with routingKey=”error”, the message will be routed to Queue1 (amqp.gen -s9b… This is the Queue name automatically generated by RabbitMQ) and Queue2 (amqp.gen-Agl…). ; If we send a message with Routing Key=”info” or routingKey=”warning”, the message will only be routed to Queue2. If we send messages with other Routing keys, the messages will not be routed to either Queue.
- topic
As mentioned above, Exchange Routing rules of direct type match Binding keys and Routing keys completely. However, such strict matching mode cannot meet actual service requirements in many cases. Topic Exchange extends the matching rule, which is similar to direct Exchage and routes messages to queues with Binding keys and Routing keys, but the matching rule is different:
The Routing Key is a period (.). Delimited strings (we call each separate string separated by a period “. “a word), such as” stock.USD. Nyse “, “NYse.vmw”, “quick.orange.rabbit”. Binding Key and Routing Key are strings separated by periods (.).
The Binding Key can contain two special characters, “” and “#”, for fuzzy matching, where “” is used to match one word and “#” is used to match multiple words (possibly zero).
- headers
A HEADERS Exchange does not rely on a Routing Key and Binding Key matching rule to route a message. Instead, it matches the HEADERS attribute in the content of the sent message.
Specify a set of key-value pairs when binding Queue and Exchange. When a message is sent to Exchange, RabbitMQ takes the headers (also in the form of a key-value pair) of the message and compares whether the key-value pair matches exactly the key pair specified when the Queue was bound to Exchange. The message is routed to the Queue if it is a perfect match, otherwise it is not.
This type of Exchange has not been used (but should be useful), so I won’t cover it.
- RPC
MQ itself is based on asynchronous message processing, and in the previous example all the producers (P) sent a message to RabbitMQ will not know whether the consumer (C) processed it successfully or failed (or even if there was a consumer to process the message).
However, in the actual application scenario, we probably need some synchronization processing, which needs to wait for the server to complete my message processing before proceeding to the next step. This is equivalent to Remote Procedure Calls (RPC). RPC is also supported in RabbitMQ.
- RabbitMQ implements RPC by:
When a client sends a request (Message), two values are set to replyTo (a Queue name, It is used to tell the server to send the notification message to this Queue when the processing is complete) and correlationId (the id of the request. The server needs to return this attribute after the processing is complete. The client will know which request is successfully executed or fails according to this ID). After receiving the message, the server generates a reply message to the Queue specified by replyTo, with the correlationId attribute. The client has subscribed to the Queue specified by replyTo. After receiving the reply message from the server, it analyzes which request is executed according to the correlationId attribute and processes subsequent services based on the execution result.
2.4 Details
2.4.1 Use ACK to confirm correct Message delivery
By default, messages are removed from the Queue if they have been correctly received by a Consumer. You can also send the same Message to multiple consumers.
If a Queue is not subscribed by any Consumer, the data will be cached and not discarded when it arrives. When there is a Consumer, the data is immediately sent to the Consumer. When this data is correctly received by the Consumer, it is deleted from the Queue.
So what is correctly received? Through an ACK. Each Message should be acknowledged (ACK). We can either explicitly ACK it programmatically or automatically ACK it. If any data has not been ACK, the RabbitMQ Server will send this information to the next Consumer.
If the APP has a bug and forgets to ACK, the RabbitMQ Server will not send data to it because the Server thinks the Consumer has limited processing power. Moreover, ACK mechanisms can be Benefitto throttling: Sending an ACK after the Consumer has processed the data, or even after an additional delay, effectively balances the Consumer’s load.
Of course, for a real example, we might merge some data, such as the data in merge 4S, and then sleep 4s to retrieve the data. Especially in the state of the monitoring system, we do not want all the states to be transmitted in real time, but hope to have a certain delay. This reduces some IO, and the end user doesn’t feel it.
2.4.2 Reject a message
There are two methods. The first, Reject, allows the RabbitMQ Server to send the Message to the next Consumer. The second option is to delete the Message immediately from the Queue.
2.4.3 Creating a queue
Both the Consumer and Procuder can create queues through queue.declare. A Consumer cannot declare a queue for a Channel but subscribe to other queues. You can also create private queues. So only the APP itself can use this queue. Queues can also be deleted automatically. A queue marked auto-delete is automatically deleted when the last Consumer unsubscribes. What about creating an existing queue? Then it won’t have any effect. Note that there is no effect, that is, if the second creation is different from the first, the operation succeeds, but the attributes of the queue are not modified.
So who should be responsible for creating this queue? Is it Consumer or Producer?
If the queue doesn’t exist, of course the Consumer doesn’t get any messages. Then the Producer Publish Message will be discarded. So, to prevent data loss, both consumers and producers try to create the queue! Either way, this interface is going to work.
The Queue handles load Balance perfectly. For multiple consumers, RabbitMQ uses a round-robin method to evenly distribute packets to different consumers.
2.4.4 Exchanges
As can be seen from the schema diagram, the Procuder published Message enters the Exchange. RabbitMQ will then find out which queue to place the Message in by “routing keys “. The queue is also bound using this routing key. There are three types of Exchanges: direct, fanout, and topic. Each implements a different routing algorithm.
- Direct exchange: ** If the routing key matches, the Message will be passed to the corresponding queue. When a queue is created, it will automatically bind the exchange with the name of the queue as the routing key.
- Fanout Exchange: broadcasts to the responding queue.
- **Topic exchange: ** Patterns key matching, such that AB can be passed to all AB queues.
2.4.5 Virtual hosts
Each virtual host is essentially a RabbitMQ Server, with its own queue, exchagne, bings rule, etc. This ensures that you can use RabbitMQ in multiple different applications.
2.4.6 Why Does Channel Not Work for TCP Connection
- TCP create destroy with three handshakes and four waves is too expensive
- The operating system has limitations on TCP connections. If TCP connections are used, thousands of connections per second may cause resource waste at peak times
- Each process has one channel. Multiple processes and channels share one TCP link. A TCP link can accommodate unlimited channels without performance bottlenecks.
Three deployment
This document describes the installation and deployment of centos7
3.1 install Erlang
Configure the yum source
cat > /etc/yum.repos.d/erlang.repo << EOF
[rabbitmq_erlang]
name=rabbitmq_erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/\$basearch
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
[rabbitmq_erlang-source]
name=rabbitmq_erlang-source
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
EOF
Copy the code
3.2 Configuring the Yum Source
cat > /etc/yum.repos.d/rabbitmq.repo <<EOF [bintray-rabbitmq-server] name=bintray-rabbitmq-rpm baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.7.x/el/7/ gpgcheck = 0 repo_gpgcheck = 0 EOF RPM enabled = 1 The RPM - import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc - import https://www.rabbitmq.com/rabbitmq-release-signing-key.ascCopy the code
3.3 the rabbitmq service
yum -y install rabbitmq-server
chkconfig rabbitmq-server on
Change the rabbitMQ data and log store directory
Create data and log directories
mkdir -pv /data/rabbitmq/mnesia
mkdir -pv /data/rabbitmq/log
chown rabbitmq.rabbitmq /data/rabbitmq/* -R
Create a configuration file
cat >/etc/rabbitmq/rabbitmq-env.conf <<EOF
RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia
RABBITMQ_LOG_BASE=/data/rabbitmq/log
EOF
systemctl status rabbitmq-server
Check whether the local CLI tool is successfully authenticated
sudo rabbitmq-diagnostics ping
Print application enabled components, TCP reviews, memory usage, alarms, etc.
sudo rabbitmq-diagnostics status
Print the valid configuration of the node
sudo rabbitmq-diagnostics environment
# Local node monitoring check
sudo rabbitmq-diagnostics node_health_check
# add user
rabbitmqctl add_user xuel xuelpwd
rabbitmqctl list_users
Listing users ...
user tags
xuel [xuel]
guest [administrator]
# Role definitionNone Minimum rights Role Management Administrator role policymaker Monitoring Super administrator [root@VM_0_12_centos ~]# rabbitmqctl set_user_tags xuel administrator
Setting tags for user "xuel" to [administrator] ...
[root@VM_0_12_centos ~]# rabbitmqctl list_users
Listing users ...
user tags
xuel [administrator]
guest [administrator]
# check all queues:
rabbitmqctl list_queues
# Add virtual host:
rabbitmqctl add_vhost vhost_name
Grant new virtual host to new user:
rabbitmqctl set_permissions -p vhost_name username '*' '*' '*'
Copy the code
3.4 configuration
Vim/usr/lib/rabbitmq/lib/rabbitmq_server - 3.7.17 ebin/rabbit. The appCopy the code
3.5 plug-in
rabbitmq-plugins enable rabbitmq_management
Rabbitmq To secure guest user access only localhost, enable guest/guest login
cat > /etc/rabbitmq/rabbitmq.config <<EOF
[{rabbit, [{loopback_users, []}]}].
EOF
systemctl restart rabbitmq-server
# page visit http://ip:15672
Copy the code
4 use
Since the technology stack is Python, this section provides a simple example for using RabbitMQ in Python
- Introduction
Because AMQP is a two-way RPC protocol, clients can send requests to the server, and the server can send requests to the client, Pika implements or extends IO loops in each of its asynchronous connection adapters. These IO loops are methods that block loops and listen for events. Each asynchronous adapter follows the same criteria to invoke the IO loop. IO loops are created when the connection adapter is created. To start the IO loop for any given adapter, call the connection.ioloop. Start () method.
- install
pip install pika
Copy the code
- demo
- We start by creating our connection object, then starting our event loop.
- When we are connected, the on_connected method is called. In that method we create a channel.
- When the channel is created, the on_channel_open method is called. In that method we declare a queue.
- When the queue is declared successfully, on_queue_declared is called. In that method we call
channel.basic_consume
telling it to call the handle_delivery for each message RabbitMQ delivers to us. - When RabbitMQ has a message to send us, it calls the handle_delivery method passing the AMQP Method frame, Header frame, and Body.
import pika
# Create a global channel variable to hold our channel object in
channel = None
# Step #2
def on_connected(connection):
"""Called when we are fully connected to RabbitMQ"""
# Open a channel
connection.channel(on_open_callback=on_channel_open)
# Step #3
def on_channel_open(new_channel):
"""Called when our channel has opened"""
global channel
channel = new_channel
channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False, callback=on_queue_declared)
# Step #4
def on_queue_declared(frame):
"""Called when RabbitMQ has told us our Queue has been declared, frame is the response from RabbitMQ"""
channel.basic_consume('test', handle_delivery)
# Step #5
def handle_delivery(channel, method, header, body):
"""Called when we receive a message from RabbitMQ"""
print(body)
# Step #1: Connect to RabbitMQ using the default parameters
parameters = pika.ConnectionParameters()
connection = pika.SelectConnection(parameters, on_open_callback=on_connected)
try:
# Loop so we can communicate with RabbitMQ
connection.ioloop.start()
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Loop until we're fully closed, will stop on its own
connection.ioloop.start()
Copy the code
Design of message sending service
The total consumer and producer are all in the total k8S cluster. For the message sending service, the producer sends messages with routing_key and uses confirm to confirm. For exchange, the direct mode is used and the corresponding bind_key is sent to the corresponding queue. Start multiple channels in connection to the UK queue, each corresponding to its own consumer to improve concurrency.
Refer to the link
- Github.com/rabbitmq/er…
- www.rabbitmq.com/install-rpm…
- Juejin. Cn/post / 684490…
- Blog.csdn.net/weixin_3860…
- www.bilibili.com/video/av576…