What is the mq

MQ, the full name is Message Queue, is a data structure based on the “first in, first out” data structure, refers to the data (messages) to be transmitted in the Queue, using the Queue mechanism to achieve Message delivery – the producer generates the Message and put the Message into the Queue, and then by the consumer to process. Consumers can go to a specified queue to pull messages, or subscribe to the corresponding queue and have messages pushed to them by the MQ server.

Use scenarios for MQ

In the work of the Tech bug, it is used in the following scenarios

  • We need to send a lot of emails to collect data every day, but we don’t want to nest the function of sending emails in our statistics script (one is to improve the statistical efficiency, the other is to decouple the business).
  • Use rabbitMQ’s dead letter queue to recover stock and question bank. For example, if an item has been ordered and the stock has been reduced, but the payment has not been made, after 30 minutes the order will be cancelled and the stock will be restored. Or in the business of doctors rob questions, some doctors rob 10 questions, but only in the period of validity (such as 1 day) answered 3 questions, but the remaining 7 questions can not always be bound by the doctor, more than 1 day will be unbound back to the library.
  • Synchronization of data information, with the prevalence of microservices, there will be different user tables for different businesses, for example, the company originally has a user table master table. However, with the development of the business, car loan is a service with its own user table, while consumer loan is also a service with its own user table. The user information of each service changes, in fact, should be synchronized to the user table in the master table. So this is where we use message queues, and every time the user modifies the message, we put a message into the message queue. We synchronize information with this queue.
  • There are other situations that trigger a lot of operations, so the message queue will also be used at this time, such as the doctor registration function, but when the doctor completes registration, we need to synchronously open other accounts for him, such as medical science number, science forum number and so on.
  • And some high concurrency scenarios, such as a snap up activities, at the end of the year when we will have integral exchange activities, a total of two hours, then the call request very much, if each request to the update database, we all get to do some update operation, may involve a number of tables, the database may be hit. To avoid this, we can save the data and let the database digest it. This is flow peaking.

What mq products and comparisons are available

Why RabbitMQ

RabbitMQ is an open source message queuing system developed in the Erlang language and implemented based on the AMQP protocol. The main characteristics of AMQP are message orientation, queuing, routing (including point-to-point and publish/subscribe), reliability, and security. AMQP is more commonly used in enterprise systems, where data consistency, stability, and reliability are highly required, rather than performance and throughput. RabbitMQ is very reliable. Data loss is 100% guaranteed. You can use mirror queues, which are very stable. So in our Internet financial industry. RabbitMQ is used when the stability and reliability of data are very high. Certainly not as good as Kafka, but much better than AvtiveMQ. You can also do some performance tuning yourself. RabbitMQ can be used to build a remote active-active architecture, including that each node can be stored on disk or memory.

Install and Boot (Ubuntu 18)Refer to the article

root@guofu:~# cat /etc/issue Ubuntu 18.04.5lts \n \l # As mentioned in the previous mq comparison, RabbitMQ is implemented by Erlang, Erlang 26 sudo apt-get install erlang-nox # add public key 27 wget -o - https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt - key add - # # 28 sudo apt update package - get the update Install the rabbitmq, 29 sudo apt-get install rabbitmq-server # Check the running status of rabbitmq service rabbitmq-server status 30 systemctl status rabbitmq-server info:Active: active (running) since Mon 2021-07-26 11:15:54 CST; 32 sudo service rabbitmq-server stop 32 sudo service rabbitmq-server start 33 sudo service 34 sudo rabbitmq-plugins enable rabbitmq_management 35 sudo service rabbitmq-server restart 36 curl http://localhost:15672Copy the code

At this point, RabbitMQ is installed and the web page is accessible. The default user name and password are guest and guest. By default, the guest user can only log in to the rabbitMQ server. You are advised to create a new user and authorize the user to perform other operations. So let’s start creating a new user

38 sudo rabbitmqctl list_users # Add user admin. The password is passwd(customize as required). 39 sudo rabbitmqctl add_user admin passwd # 40 sudo rabbitmqctl set_user_tags admin administrator # Assign the configuration, write, and read permissions to all resources in the virtual host to manage them. 41 sudo rabbitmqctl set_permissions -p/admin '.*'.*'.Copy the code

Log in remotely using the admin account

Configuration File Interpretation

Rabbitmq-env.conf Environment variable of rabbitmq

root@guofu:~# cd /etc/rabbitmq/ root@guofu:/etc/rabbitmq# ls enabled_plugins rabbitmq-env.conf root@guofu:/etc/rabbitmq#  cat rabbitmq-env.conf # Defaults to rabbit. This can be useful if you want to run more than one node # per machine - RABBITMQ_NODENAME should be unique per erlang-node-and-machine # combination. See the clustering on a single machine guide for details: # http://www.rabbitmq.com/clustering.html#single-machine # NODENAME = rabbit - node name, if the service is a form of cluster, By default RabbitMQ will bind to all interfaces, on IPv4 and IPv6 if # available. Set this if you only want to bind to one network interface or# # address family. #NODE_IP_ADDRESS=127.0.0.1 -- node IP address # Defaults to 5672. #NODE_PORT=5672 -- node port # Default rabbitmq-server wait timeout.Copy the code

Architecture of the MQ server

Related References Related references

  • Let’s first look at the rabbitMQ architecture diagram

  • Broker: Identifies the message queue server entity rabbitmq-server
  • V-host: Virtual host Indicates the Virtual host. For example, if I have two users a and B, and I want user A to access only queue A1 and user B to access queue B1, this will not be possible under the same vhost.

We can assign a user to which or which vhosts they can access. It is not possible to assign access to exchanges or queues because rabbitMQ does not have fine-grained permissions for exchanges and queues. Its minimum granularity is vhost(which contains many exchanges, queues, and bingdings). So if exchangeA and queueA can only be accessed by user A, and exchangeB and queueB can only be accessed by user B, the only way to do that is to create A vhostA for Exchange and queueA, Create a vhostB for exchangeB and queueB, thus isolating them. The Vhost is the foundation of the AMQP concept and must be specified at link time. The default vhost for RabbitMQ is /. The command to view all virtual hosts is

root@guofu:/etc/rabbitmq# rabbitmqctl add_vhost test_vhost Creating vhost "test_vhost" root@guofu:/etc/rabbitmq# root@guofu:/etc/rabbitmq# rabbitmqctl list_users Listing Users admin [administrator] guest [administrator] # Assign permissions set_permissions [-p <vhost>] <user> <conf> <write> <read> # Note that RabbitMQ caches the permission verification results for each connection or channel, so you need to reconnect to make a permission change take effect. root@guofu:/etc/rabbitmq# set rabbitmqctl set_permissions -p test_host admin ".*" ".*" ".*"Copy the code
  • Exchange: Used by a switch to receive messages from producers and route them to queues on a server.

As you can see from the Web page, there are four options for Exchange, and there are two persisting methods, one is memory, the other is hard disk

  • Fanout/(Publish/Subscribe

The producer sends the message to the switch, and the switch will copy and synchronize all the messages to all the queues bound to it, depending on its type (fanout). Each queue can have a consumer receiving the message for consumption logic. We need to create our own exchange and bind, create more than one queue for binding, if a consumer binding more than one queue polling, because MQ has the characteristics of read and burn, only one consumer can guarantee read acceptance. Often used to send group messages.

  • Routing mode: / Routing/direct

The producer sends a message to the switch carrying a specific route key. The switch is of type Direct, and compares the routingKey in the received message to the queue routingKey bound to it. The consumer listens to a queue, gets a message, and performs the consumption logic. A queue can be bound to one routingKey or multiple routingKeys. When a message is routed, it carries a routingKey to find the corresponding queue.

  • Topic/ wildcard matching

The producer sends a message with a specific routing key in the message. The switch type is Topic. The queue bound switch does not use a specific routing key but a range value, such as.yell., hlL.iii, JJJ.#. Where * represents a string (no special characters) # represents any

  • Header Exchange is somewhat similar to the subject switch, but unlike the subject switch whose routing is based on routing keys, the header exchange’s routing value is based on the header data of the message. Chestnut to illustrate
Queue A: Format = PDF,type=report,x-match=all, queue B: The parameters for binding the switch are format= PDF,type=log,x-match=any, and queue C: The parameters for binding the switch are format=zip,type=report,x-match=all. Format = PDF,type=reprot the header parameter for sending the message to queue A and queue B is: Format =zip,type=log If the message does not match the queue, the message will be discarded. All: Default value. A route can be routed to the switch only if the key and value pairs in the message transmitting header match all the key and value pairs in the switch. Any: A route can be routed to the switch if the key and value pairs in the message transmitting header match any key and value pairs in the switchCopy the code
  • Queen: Message queue that holds messages until they are sent to consumers. It is the container and destination of the message. A message can be put into one or more queues. The message stays in the queue, waiting for a consumer to connect to the queue and pick it up.
  • Banding: Binding, used to associate a message queue with a switch. A binding is a routing rule that connects a switch to a message queue based on a routing key, so you can think of a switch as a routing table made up of bindings. ### Use Virtual Host

  • Channel: a separate two-way data flow Channel in a multiplexing connection. A channel is a virtual link established within a real TCP connection. AMQP commands are sent out through the channel, whether it is to publish a message, subscribe to a queue or receive a message, these actions are completed through the channel. Because TCP is expensive for the operating system to set up and destroy, the concept of a channel is introduced to reuse a TCP connection.

  • Connection: Network Connection, such as a TCP Connection.

Next let’s do a demonstration based on the different types of exchang above
  • The user and vhost are created first (for the purposes of this demonstration, we will use as many of the previous commands as possible, depending on whether the vhost is created or not), and these operations can also be done through the Web page.
root@guofu:/etc/rabbitmq# rabbitmqctl add_vhost guofu_vhost Creating "guofu_vhost root@guofu:/etc/rabbitmq# rabbitmqctl list_vhosts Listing vhosts guofu_vhost/test_vhost # Create user and password root@guofu:/etc/rabbitmq# rabbitmqctl add_user guofu guofu Creating user "guofu" root@guofu:/etc/rabbitmq# Rabbitmqctl list_users Listing users vhost1 [] admin [administrator] guofu [] guest [administrator] Otherwise, you cannot remotely log in to root@guofu:/etc/rabbitmq# sudo rabbitmqctl set_user_tags guofu administrator Setting tags for user "guofu" to [administrator] # Give user vhost permission, root@guofu:/etc/rabbitmq# sudo rabbitmqctl set_permissions -p guofu_vhost guofu ".*" ".*" ".*" Setting Permissions for user "guofu" in vhost" guofu_vhost" # permissions for user root@guofu:/etc/rabbitmq# sudo rabbitmqctl list_user_permissions guofu Listing permissions for user "guofu" guofu_vhost .* .* .*Copy the code

After the configuration, we can also see in the page, has taken effect

  • Create a new switch and specify the vhost

  • Create two new queues and bind them to Exchange

  • We configured the information into the code to refer to the resources
package main import ( "github.com/streadway/amqp" ) /** * @Description: To demonstrate rabbitMQ exchange types - production,fanout to facilitate the demonstration, */ func main() {var exchange="guofu_exchange" // set up connection username + password + IP + port +vhost conn, _ := amqp.Dial("amqp://guofu:[email protected]:5672/guofu_vhost") Ch.exchangedeclare (Exchange, "fanout", true, false, false, false, nil, MsgBody :=" I am a MSg3 "exchange, Key String, Mandatory, immediate bool, msg Publishing err := ch.Publish( exchange, //exchange "", //routing key(queue name) false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, //Msg set as persistent ContentType: "text/plain", Body: []byte(msgBody), }) if err ! =nil{ panic(err) } }Copy the code
  • Let’s take a look through the web page

  • It can be seen that for fanout publishing and subscription, in fact, when we push messages, we only use Exchange and type, not queue, because any queue bound with this exchange will be pushed messages. In other words, the fanout pattern, where a message is pushed to multiple queues, what kind of situations does this pattern apply? For example, after the user registers, I want to send both email and SMS, so SMS and email, you can use the fanout mode

  • So I’m going to write the code for the consumption, so the way to consume the queue is the same, but I’m going to do it here, and I’m not going to do it for the other types of Exchange.

package main import ( "github.com/streadway/amqp" ) /** * @Description: To demonstrate rabbitMQ exchange types - production,fanout to facilitate the demonstration, */ func main() {var exchange="guofu_exchange" // set up connection username + password + IP + port +vhost conn, _ := amqp.Dial("amqp://guofu:[email protected]:5672/guofu_vhost") Ch.exchangedeclare (Exchange, "fanout", true, false, false, false, nil, MsgBody :=" I am a MSg3 "exchange, Key String, Mandatory, immediate bool, msg Publishing err := ch.Publish( exchange, //exchange "", //routing key(queue name) false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, //Msg set as persistent ContentType: "text/plain", Body: []byte(msgBody), }) if err ! =nil{ panic(err) } }Copy the code
  • The above code is clear to everyone, but it should be noted that there is a point “tentatively created” in it. What does this mean? I didn’t create guofu_queue3, but I listened to this queue and got a message

  • So let’s create a new Exchange and queue with the consumption code
Package the main import (" FMT "" github.com/streadway/amqp") func main () {/ var/switch exchange = "guofu_exchange_test" var Queue = "guofu_queue_test" var key = "" // Establish connection username + password + IP + port +vhost conn, _ := amqp.Dial("amqp://guofu:[email protected]:5672/guofu_vhost") Ch.exchangedeclare (Exchange, "fanout", true, false, false, false, nil, Name String, durable, autoDelete, EXCLUSIVE, noWait bool, args Table _, err := ch.QueueDeclare( queue, true, false, false, false, nil, ) if err ! = nil {panic(err)} // Bind queue (name, key, Exchange String, noWait bool, args Table) Consume(Queue, Consumer String, autoAck, EXCLUSIVE, noLocal, noWait bool, args Table) msg, err := ch.Consume( queue, "", false, false, false, false, nil, ) for d:=range msg{ fmt.Println(string(d.Body)) d.Ack(false) } }Copy the code
  • The switch and queue are created

  • Another thing to note is the ACK mechanism

There are three types of acknowledgement mechanisms: None, Auto (default), and manual. Automatic ACK: Once a message is received, the consumer automatically sends an ACK. Manual ACK: After a message is received, the consumer does not send an ACK. This depends on the importance of the message: if the message is not important enough to be lost, then automatic ACK is more convenient if the message is too important to be lost. It is better to ACK manually after the consumption is complete, otherwise it will ACK automatically after receiving the message and RabbitMQ will remove the message from the queue. If the consumer goes down at this point, the message is lost.

  • Another point is that in PHP and Java, there is a producer message acknowledgement mechanism that supports function callback after a successful message push, but I did not find this method in Golang

  • Ok, let’s go back to the second type of Exchange direct routing pattern, this time using the code on the consumer side to queue and listen directly
Package the main import (" FMT "" github.com/streadway/amqp") func main () {/ var/switch exchange = "direct_guofu_exchange" var Queue = "direct_guofu_queue" var key = "direct_key" _ := amqp.Dial("amqp://guofu:[email protected]:5672/guofu_vhost") Ch.exchangedeclare (Exchange, "direct", true, false, false, false, nil, Name String, durable, autoDelete, EXCLUSIVE, noWait bool, args Table _, err := ch.QueueDeclare( queue, true, false, false, false, nil, ) if err ! = nil {panic(err)} // Bind queue (name, key, Exchange String, noWait bool, args Table) Consume(Queue, Consumer String, autoAck, EXCLUSIVE, noLocal, noWait bool, args Table) msg, err := ch.Consume( queue, "", false, false, false, false, nil, ) for d:=range msg{ fmt.Println(string(d.Body)) d.Ack(false) } }Copy the code

  • Use the same method to create the queue direct_GUOFU_queue

  • If two queues use the same key, the exchange will be pushed to both queues according to the key. Avoid duplicate keys if necessary to reduce dirty data generation

package main import ( "github.com/streadway/amqp" ) /** * @Description: To demonstrate rabbitMQ exchange types - production,fanout to facilitate the demonstration, */ func main() {var exchange = "direct_guofu_exchange" var key = "direct_key" // Set up connection username + password + IP + port +vhost conn, _ := amqp.Dial("amqp://guofu:[email protected]:5672/guofu_vhost") Ch.exchangedeclare (Exchange, "direct", true, false, false, false, nil, MsgBody :=" I am a direct" exchange, key String, Mandatory, immediate bool, msg Publishing err := ch.Publish( exchange, //exchange key, //routing key(queue name) false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, //Msg set as persistent ContentType: "text/plain", Body: []byte(msgBody), }) if err ! =nil{ panic(err) } }Copy the code

  • Topic schema

Topic is similar to mysql’s fuzzy query, as long as there is a fuzzy match, the message will be pushed. , the push route can be a route that contains multiple attributes to. The maximum length of the string is about 200. After the push, other routes will match other queues, and if they match, they will be pushed in. Now let’s say we have the following scenario and there are two queues, the first queue animal, if it’s an animal, goes into this queue, the second queue is plant, and the third queue is yellow, if it’s yellow, goes into this queue, and now we’re going to push these into the queue 1. 2. The orange cat wants to go to Animal and yellow

# animal.#,#.plant.#,yellow.#,

	var exchange = "topic_guofu_exchange"
	var queue = "topic727_yellow"
	var key = "yellow.#"

	var exchange = "topic_guofu_exchange"
	var queue = "topic727_animal"
	var key = "#.animal.#"


	var exchange = "topic_guofu_exchange"
	var queue = "topic727_plant"
	var key = "#.plant.#"
Copy the code

So when I push a message, if the routing key bound to my topic is yellow.Animal. Plant, then all three message queues will be matched. Let’s see

  • Post the production code
package main import ( "github.com/streadway/amqp" ) /** * @Description: To demonstrate rabbitMQ exchange types - production,fanout to facilitate the demonstration, */ func main() {var exchange = "topic_guofu_exchange" var key = "yellow.animal. Plant "var queue = "topic727" Conn, _ := amqp.Dial("amqp://guofu:[email protected]:5672/ guoFU_vhost ") Ch.exchangedeclare (Exchange, "topic", True, false, false, false, nil, Name String, durable, autoDelete, EXCLUSIVE, noWait bool, args Table _, err := ch.QueueDeclare( queue, true, false, false, false, nil, ) if err ! = nil {panic(err)} // Bind queue (name, key, Exchange String, noWait bool, args Table) Exchange, false, nil) // Define message msgBody := key // Send message parameters Exchange, key string, Mandatory, immediate bool, msg Publishing err = ch.Publish( exchange, //exchange key, //routing key(queue name) false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, //Msg set as persistent ContentType: "text/plain", Body: []byte(msgBody), }) if err ! = nil { panic(err) } }Copy the code
  • After the push, it is found that all four queues have data (the first queue is bound when the topic is pushed, and the next three are matched by routes).

  • So in this case, if THE key I push is yellow. Animal, then the route will match yellow.# and #

  • The function of topic is relatively powerful. With topic, the function of direct and Fanout can be implemented. The routing key can contain any number of words, up to 255 bytes.

  • Header Exchange is somewhat similar to the subject switch, but unlike the subject switch whose routing is based on routing keys, the header exchange’s routing value is based on the header data of the message. Here also don’t do tautology. If you’re interested, you can go to the official website.