0.RabbitMQ Introduction, application scenarios
RabbitMQ is a popular messaging middleware technology that integrates well with Spring.
So what RabbitMQ is for, or where we need to use messaging middleware. The following describes the application scenarios of message-oriented middleware:
0.0 Asynchronous processing
Scenario description: After registration, users need to send registration emails or SMS messages. Serial mode, 2. Parallel mode.
Serial mode: After the registration information is written into the database, a registration email is sent, and then a registration SMS is sent to the client only after the preceding three tasks are complete. One problem with this is that the email, SMS is not required, it is just a notification, and this practice makes the client wait for something that is not necessary.
In parallel mode: After the registration information is written to the database, an email is sent and a SHORT message is sent. After the preceding three tasks are completed, the registration information is returned to the client. In parallel mode, the processing time is increased.
In addition to the above two methods, you can also use message queue asynchronously to operate:
Message queue: assume that the three service nodes use 50ms, 150ms in serial mode, and 100ms in parallel mode. While parallelism has improved processing times, SMS and email messages have no impact on the normal use of the site, and the client does not need to wait for them to be sent before registering successfully. Instead, it should be written to the database and returned. After the introduction of message queuing, send mail, SMS is not necessary business logic asynchronous processing.
0.1 Apply decoupling
Scenario: Double 11 is the shopping frenzy Festival. After users place an order, the order system needs to notify the inventory system. The traditional way is that the order system calls the interface of the inventory system.
But there is a downside to this approach: when the inventory system fails, orders fail. The order system and the inventory system are highly coupled. After the message queue is introduced:
Order system: after the user places an order, the order system completes the persistent processing, writes the message to the message queue, and returns the user to place the order successfully.
Inventory system: subscribe to the order message, get the order message, inventory operation. Even if the inventory system fails, the message queue can ensure the reliable delivery of messages and will not lead to message loss.
0.2 Flow peak cutting
Scenario: Applications are suspended due to heavy traffic. To solve this problem, applications are added to message queues in front of applications.
Function:
1. You can control the number of active people. Orders exceeding the set number are discarded directly
2. It can alleviate the crushing of applications with high traffic for a short period of time (the application obtains orders according to its maximum processing capacity)
1. After receiving a user request, the server writes the request to the message queue. If the length of the request exceeds the maximum value, the server directly discards the request or redirects to the error page.
2. The second kill service performs subsequent processing based on the request information in the message queue.
1. Install Rabbitmq – by RPM
1.1 Installing the Erlang environment
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
Copy the code
Install Erlang:
yum install -y erlang
Copy the code
Check the Erlang version:
erl
Copy the code
Before installing Rabbitmq, make sure the version of Erlang is the same as the version of Rabbitmq.
Check the relationship between Erlang and RabbitMQ versions
The version of Erlang I have installed here is:
Since I installed Erlang version 24, I will install RabbitMQ at least 3.8.16
1.2 install the RabbitMQ
rpm --import https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey rpm --import https://packagecloud.io/gpg.key rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc yum -y install epel-release yum -y install socat curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bashCopy the code
Download the RPM package:
Wget HTTP: / / https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.25/rabbitmq-server-3.8.25-1.el7.noarch.rpmCopy the code
The download may be slow, or you can download the RPM package in Windows and then upload it to the server.
Installation:
The RPM - the ivh the rabbitmq server - 3.8.25-1. El7. Noarch. RPMCopy the code
To access the RabbitMQ web management page, do this:
rabbitmq-plugins enable rabbitmq_management
Copy the code
Start the RabbitMQ:
systemctl start rabbitmq-server
Copy the code
You can then access the console address:
HTTP: your server IP :15672
If you cannot access RabbitMQ, check that your RabbitMQ service starts successfully and port 15672 is open.
Log in using the default account: guest/guest.
You can’t log in, and you’ll get a message “User can only log in via localhost.”
The RabbitMQ web management page can only be accessed as localhost.
To access the /etc/rabbitmq/ directory:
cd /etc/rabbitmq
Copy the code
vi rabbitmq.config
Copy the code
To the rabbitmq.config file add (remember the last dot or the restart will fail) :
[{rabbit, [{loopback_users, []}]}].
Copy the code
Then restart:
systemctl restart rabbitmq-server
Copy the code
2. It agreement
2.1 What is AMQP
AMQP is a network Protocol by which Advanced Message Queuing Protocol (AMQP) is Queuing. Used for communication between client applications and messaging middleware.
2.2 Introduction to AMQP model
Above is the AMQP model. The main process is that messages are sent from Publisher (or provider) to switch, and the switch forwards the received messages to different queues according to routing rules. Finally RabbitMQ will push messages from the queue to consumers who subscribe to the queue, or consumers will fetch messages from the queue.
From a security point of view, the network is unreliable. So when consumers consume messages, they may fail to process them for some reason. For this reason, RabbitMQ provides a message acknowledgements mechanism: When a message is posted from the queue to the consumer, it is acknowledged either automatically or manually. After the consumer confirms, the queue deletes the message.
There are two ways to confirm a message:
1. Automatic confirmation, when the message is sent to consumers, automatically delete.
2. Delete the message after the application sends a confirmation receipt. The acknowledgement receipt can be sent immediately after the message is received, stored after the message is sent, or sent after the message is processed.
In some cases, such as when a message was not successfully routed. The message may be returned to the publisher and discarded.
2.3 Switch and Switch Type
AMQP provides four types of switches:
2.3.1 Default Switch
The default exchange is a nameless pre-declared switch by RabbitMQ. Each new queue, if no specific switch name is specified, is automatically bound to the default switch with the bound routing key name as the queue name.
I’ve seen some talk of channels sending messages directly to switches or directly to queues. I don’t think it’s accurate to say “directly to the queue”. In this case, it should be sent to the default switch, which then routes to the corresponding queue bound to it based on the routing key.
2.3.2 Directly Connecting to a Switch
A direct exchange forwards a message to a queue based on the routing key carried by the message. The default mode of a direct switch is equal allocation. If a direct switch is bound to multiple queues with the same routing key, the direct switch evenly distributes messages to all the queues.
2.3.3 Sector Switch
A Fanout exchange routes messages to all queues bound to its audit, regardless of the bound routing key. If multiple queues are bound to a sector switch, when a message is sent to the sector switch, the switch copies the message to all queues bound to the sector switch.
2.3.4 Topic Switch
Topic exchanges route messages to one or more queues by matching messages’ routing keys and queue-to-switch binding patterns. Topic switches are often used to implement various distribution/subscription patterns and their variants. Topic switches are usually used to implement multicast routing of messages.
2.4 the queue
2.4.1 Queue Persistence
Durable queues are stored on disk and will remain when RabbitMQ restarts. Queues that are not persisted are called Transient queues.
It is important to note that persistence of queues does not mean that messages in queues are persisted. If the message is not persisted, it will not be persisted after RabbitMQ restarts, although the persistence queue exists.
2.5 channel
Messages are sent over channels to switches and queues. Is the medium through which messages are transmitted.
2.6 Virtual Host
RabbitMQ provides virtual hosts to isolate multiple environments. When establishing a connection, you need to specify which virtual host to use.
3.Rabbitmq Web management page
4. Demo implementation of five models
4.0 Dependencies
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> The < version > 4.12 < / version > < / dependency > < / dependencies >Copy the code
4.1 Hello World
The elements of this message are three, a producer, a consumer, and a queue. This model makes it easy to assume that the producer sends messages directly to the queue, which it does not. RabbitMQ has a default switch that binds the queue to the default switch unless the queue is explicitly bound to a switch with a specific name. The bound route key is the queue name. So in this model, messages are essentially sent by the consumer to the switch, and the switch routes the message to the queue, and the consumer gets the queued message, or the queue pushes the message to the subscribing consumer.
Create a virtual host and the corresponding test user before operation:
Message producer Provider produces messages:
@runWith (springrunner.class) @SpringbooTtest Public Class Provider {@test public void sendMessage()throws IOException, TimeoutException {// create a ConnectionFactory object to connect to mq ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / set up the connection of the rabbitmq host connectionFactory. SetHost (" yourserverip "); / / set the port number the connectionFactory. SetPort (5672); / / set up the connection of the virtual host connectionFactory. SetVirtualHost ("/not "); / / set access virtual host user name and password connectionFactory. SetUsername (" qingyuan "); connectionFactory.setPassword("qingyuan"); / / to get Connection object Connection Connection = connectionFactory. NewConnection (); Channel Channel = connection.createchannel (); @param2 specifies whether the queue needs to be persisted. True specifies whether the queue needs to be persisted. @param3 EXCLUSIVE specifies whether the queue needs to be exclusive. True exclusive * @param4 extra argument */ channel.queueDeclare("hello",true,false,false,null); BasicPublish ("","hello", channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes()); // Close channel.close(); connection.close(); }}Copy the code
After executing, you can see that a message already exists in the queue:
Consumer news:
@runwith (springrunner.class) @springbooTtest Public class Consumer {@test public void Receive()throws IOException, TimeoutException {// create a ConnectionFactory object to connect to mq ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / set up the connection of the rabbitmq host connectionFactory. SetHost (" yourserverip "); / / set the port number the connectionFactory. SetPort (5672); / / set up the connection of the virtual host connectionFactory. SetVirtualHost ("/not "); / / set access virtual host user name and password connectionFactory. SetUsername (" qingyuan "); connectionFactory.setPassword("qingyuan"); / / to get Connection object Connection Connection = connectionFactory. NewConnection (); Channel Channel = connection.createchannel (); @param2 specifies whether the queue needs to be persisted. True specifies whether the queue needs to be persisted. @param3 EXCLUSIVE specifies whether the queue needs to be exclusive. True exclusive * @param4 extra argument */ channel.queueDeclare("hello",true,false,false,null); /** * consume message * @param1 Queue name * @param2 Auto confirm true Yes Auto confirm * @param3 callback after receiving message */ channel.basicConsume("hello",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("message:"+new String(body)); }}); }}Copy the code
After execution:
You can see that the message in the queue has been consumed.
4.2 the Work the queues
You can see that the Work Queues model is similar to the first Helloworld model, except that the work queues model has multiple consumers instead of one.
So in what context is this work queues model used? In a consumer-to-producer model, if producers produce messages much faster than consumers consume them, messages continue to accumulate in the queue, blocking the queue.
Using the work queue model, on the basis of the original, add multiple consumers, multiple consumers jointly consume the message in the same queue, and under the premise of using this model, the message will not be repeated consumption.
Create message producers to produce messages:
/** * @runwith (springrunner.class) @SpringbooTtest Public Class Provider {@test public void sendMessage()throws IOException, TimeoutException {// create a ConnectionFactory object to connect to mq ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / set up the connection of the rabbitmq host connectionFactory. SetHost (" yourserverip "); / / set the port number the connectionFactory. SetPort (5672); / / set up the connection of the virtual host connectionFactory. SetVirtualHost ("/not "); / / set access virtual host user name and password connectionFactory. SetUsername (" qingyuan "); connectionFactory.setPassword("qingyuan"); / / to get Connection object Connection Connection = connectionFactory. NewConnection (); Channel Channel = connection.createchannel (); @param2 specifies whether the queue needs to be persisted. True specifies whether the queue needs to be persisted. @param3 EXCLUSIVE specifies whether the queue needs to be exclusive. True exclusive * @param4 extra argument */ channel.queueDeclare("hello",true,false,false,null); for (int i = 0; i < 10; I++) {channel. BasicPublish (" ", "hello", null, (I + "= = = = > : I'm a news"). The getBytes ()); } // Close the resource channel.close(); connection.close(); }}Copy the code
Create consumer 1Consumer1:
Public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// create a ConnectionFactory object to connect to mq ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / set up the connection of the rabbitmq host connectionFactory. SetHost (" yourserverip "); / / set the port number the connectionFactory. SetPort (5672); / / set up the connection of the virtual host connectionFactory. SetVirtualHost ("/not "); / / set access virtual host user name and password connectionFactory. SetUsername (" qingyuan "); connectionFactory.setPassword("qingyuan"); / / to get Connection object Connection Connection = connectionFactory. NewConnection (); Channel Channel = connection.createchannel (); @param2 specifies whether the queue needs to be persisted. True specifies whether the queue needs to be persisted. @param3 EXCLUSIVE specifies whether the queue needs to be exclusive. True exclusive * @param4 extra argument */ channel.queueDeclare("hello", true, false, false, null); /** * consume message * @param1 queue name * @param2 auto confirm true Yes Auto confirm * @param3 callback after receiving message */ channel.basicConsume("hello", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {system.out.println (" Consumer 1: "+ new String(body)); }}); }}Copy the code
Create consumer 2Consumer2:
Public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// create a ConnectionFactory object to connect to mq ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / set up the connection of the rabbitmq host connectionFactory. SetHost (" yourserverip "); / / set the port number the connectionFactory. SetPort (5672); / / set up the connection of the virtual host connectionFactory. SetVirtualHost ("/not "); / / set access virtual host user name and password connectionFactory. SetUsername (" qingyuan "); connectionFactory.setPassword("qingyuan"); / / to get Connection object Connection Connection = connectionFactory. NewConnection (); Channel Channel = connection.createchannel (); @param2 specifies whether the queue needs to be persisted. True specifies whether the queue needs to be persisted. @param3 EXCLUSIVE specifies whether the queue needs to be exclusive. True exclusive * @param4 extra argument */ channel.queueDeclare("hello", true, false, false, null); /** * consume message * @param1 queue name * @param2 auto confirm true Yes Auto confirm * @param3 callback after receiving message */ channel.basicConsume("hello", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties, byte[] body) throws IOException {try {// Process a message one second slower thread.sleep (1000); } catch (InterruptedException e) { e.printStackTrace(); } system.out.println (" consumer 2: "+ new String(body)); }}); }}Copy the code
Run the two consumers first, then the producers, and you see console output:
You can see that the message is consumed. To explain here, although the callback method in the consumer 2 thread to sleep, but it is not effective, because it opens the message automatically confirm, that is to say, when consumers get the message, the message will be automatically confirmed that the queue corresponding message will be deleted, even if the acknowledgement method not completed, The queue also acts as if the consumer has consumed the message.
And as you can see in the console, work Queues, the default message allocation policy is polling, or averaging, queues distribute messages equally to each consumer.
What might be the problem, then the average distribution assumes a certain customer, processing messages distributed far slower than the speed of the message queue, the speed of the consumer right now is open the message automatically confirm mechanism, consumers receive the message, but did not immediately processing is completed, if the program execution error, somewhere to crash the program, The remaining messages that are not executed will be lost. In message queues, messages are deleted after a message acknowledgement is received.
Then how to solve this problem: 1. Consumers do not turn on automatic message confirmation, but use manual confirmation. That is, after the consumer has received the message and processed the message, it will perform manual confirmation.
channel.basicQos(1); BasicConsume ("work",false,new DefaultConsumer(channel){@override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try{ Thread.sleep(2000); }catch (Exception e){ e.printStackTrace(); } system.out.println (" consumer -1: "+new String(body)); BasicAck (envelope. GetDeliveryTag (),false); }});Copy the code
After modification, you’ll see that the faster the consumer, the more messages it receives. Our work queues have gone from being “shared equitably” to today’s “more work, more gain”.
4.3 the Fanout
Fanout model, also known as broadcast model. Under this model, there can be more than one consumer, and each consumer has its own queue. The producer sends the message to the switch, which decides which queue to send it to, and the switch sends the message to all the bound queues. All consumers in the queue get the message. Implement a message to be consumed by multiple consumers.
Producers:
/ / @runwith (springrunner.class) @SpringbooTtest Public Class Provider {@test public void sendMessage()throws IOException, TimeoutException {// create a ConnectionFactory object to connect to mq ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / set up the connection of the rabbitmq host connectionFactory. SetHost (" yourserverip "); / / set the port number the connectionFactory. SetPort (5672); / / set up the connection of the virtual host connectionFactory. SetVirtualHost ("/not "); / / set access virtual host user name and password connectionFactory. SetUsername (" qingyuan "); connectionFactory.setPassword("qingyuan"); / / to get Connection object Connection Connection = connectionFactory. NewConnection (); Channel Channel = connection.createchannel (); /** * Declare the switch * @param1 Switch name * @param2 switch type */ channel.exchangeDECLARE ("logs","fanout"); BasicPublish ("logs","", NULL,"fanout type message".getbytes ()); Channel.close (); connection.close(); }}Copy the code
Consumer 1:
public class Consumer1 { public static void main(String[] args) throws IOException, TimeoutException {// create a ConnectionFactory object to connect to mq ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / set up the connection of the rabbitmq host connectionFactory. SetHost (" yourserverip "); / / set the port number the connectionFactory. SetPort (5672); / / set up the connection of the virtual host connectionFactory. SetVirtualHost ("/not "); / / set access virtual host user name and password connectionFactory. SetUsername (" qingyuan "); connectionFactory.setPassword("qingyuan"); / / to get Connection object Connection Connection = connectionFactory. NewConnection (); Channel Channel = connection.createchannel (); // Bind the switch channel.exchangeDECLARE ("logs","fanout"); String queueName = channel.queuedeclare ().getQueue(); // Create a temporary queue. Channel. queueBind(queueName,"logs",""); BasicConsume (queueName,true,new DefaultConsumer(channel){@override public void handleDelivery(String) consumerTag, Envelope envelope, AMQP.BasicProperties properties, Byte [] body) throws IOException {system.out.println (" Consumer 1: "+new String(body)); byte[] body) throws IOException {system.out.println (" Consumer 1: "+new String(body)); }}); }}Copy the code
Consumer 2:
public class Consumer2 { public static void main(String[] args) throws IOException, TimeoutException {// create a ConnectionFactory object to connect to mq ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / set up the connection of the rabbitmq host connectionFactory. SetHost (" yourserverip "); / / set the port number the connectionFactory. SetPort (5672); / / set up the connection of the virtual host connectionFactory. SetVirtualHost ("/not "); / / set access virtual host user name and password connectionFactory. SetUsername (" qingyuan "); connectionFactory.setPassword("qingyuan"); / / to get Connection object Connection Connection = connectionFactory. NewConnection (); Channel Channel = connection.createchannel (); // Bind the switch channel.exchangeDECLARE ("logs","fanout"); String queueName = channel.queuedeclare ().getQueue(); // Create a temporary queue. Channel. queueBind(queueName,"logs",""); BasicConsume (queueName,true,new DefaultConsumer(channel){@override public void handleDelivery(String) consumerTag, Envelope envelope, AMQP.BasicProperties properties, Byte [] body) throws IOException {system.out.println (" Consumer 2: "+new String(body)); byte[] body) throws IOException {system.out.println (" Consumer 2: "+new String(body)); }}); }}Copy the code
Consumer 3:
public class Consumer3 { public static void main(String[] args) throws IOException, TimeoutException {// create a ConnectionFactory object to connect to mq ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / set up the connection of the rabbitmq host connectionFactory. SetHost (" yourserverip "); / / set the port number the connectionFactory. SetPort (5672); / / set up the connection of the virtual host connectionFactory. SetVirtualHost ("/not "); / / set access virtual host user name and password connectionFactory. SetUsername (" qingyuan "); connectionFactory.setPassword("qingyuan"); / / to get Connection object Connection Connection = connectionFactory. NewConnection (); Channel Channel = connection.createchannel (); // Bind the switch channel.exchangeDECLARE ("logs","fanout"); String queueName = channel.queuedeclare ().getQueue(); // Create a temporary queue. Channel. queueBind(queueName,"logs",""); BasicConsume (queueName,true,new DefaultConsumer(channel){@override public void handleDelivery(String) consumerTag, Envelope envelope, AMQP.BasicProperties properties, Byte [] body) throws IOException {system.out.println (" consumer 3: "+new String(body)); byte[] body) throws IOException {system.out.println (" consumer 3: "+new String(body)); }}); }}Copy the code
Operation effect:
Created switches:
4.4 Routing – Direct
Producers:
public class Provider { public static void main(String[] args) throws IOException, TimeoutException {// create a ConnectionFactory object to connect to mq ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / set up the connection of the rabbitmq host connectionFactory. SetHost (" yourserverip "); / / set the port number the connectionFactory. SetPort (5672); / / set up the connection of the virtual host connectionFactory. SetVirtualHost ("/not "); / / set access virtual host user name and password connectionFactory. SetUsername (" qingyuan "); connectionFactory.setPassword("qingyuan"); / / to get Connection object Connection Connection = connectionFactory. NewConnection (); Channel Channel = connection.createchannel (); String exchangeName = "logs_direct"; channel.exchangeDeclare(exchangeName,"direct"); String routingKey = "info"; / / send a message channel. BasicPublish (exchangeName routingKey, null, (" this is the direct model based on the route of release key: ["+routingKey+"] Sent message ").getBytes()); // Close channel.close(); connection.close(); }}Copy the code
Consumer 1:
/** * routing-direct Consumer1 */ public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// create a ConnectionFactory object to connect to mq ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / set up the connection of the rabbitmq host connectionFactory. SetHost (" yourserverip "); / / set the port number the connectionFactory. SetPort (5672); / / set up the connection of the virtual host connectionFactory. SetVirtualHost ("/not "); / / set access virtual host user name and password connectionFactory. SetUsername (" qingyuan "); connectionFactory.setPassword("qingyuan"); / / to get Connection object Connection Connection = connectionFactory. NewConnection (); Channel Channel = connection.createchannel (); String exchangeName = "logs_direct"; channel.exchangeDeclare(exchangeName,"direct"); String queue = channel.queuedeclare ().getQueue(); /** * Bind the queue to the switch * @param1 Queue name * @param2 Switch name * @param3 Route key */ channel.queueBind(queue,exchangeName,"error"); BasicConsume (queue,true,new DefaultConsumer(channel){@override public void handleDelivery(String) consumerTag, Envelope envelope, AMQP.BasicProperties properties, Byte [] body) throws IOException {system.out.println (" Consumer 1: "+ new String(body)); byte[] body) throws IOException {system.out.println (" Consumer 1: "+ new String(body)); }}); }}Copy the code
Consumer 2:
/** * routing-direct Consumer2 */ public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// create a ConnectionFactory object to connect to mq ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / set up the connection of the rabbitmq host connectionFactory. SetHost (" yourserverip "); / / set the port number the connectionFactory. SetPort (5672); / / set up the connection of the virtual host connectionFactory. SetVirtualHost ("/not "); / / set access virtual host user name and password connectionFactory. SetUsername (" qingyuan "); connectionFactory.setPassword("qingyuan"); / / to get Connection object Connection Connection = connectionFactory. NewConnection (); Channel Channel = connection.createchannel (); String exchangeName = "logs_direct"; channel.exchangeDeclare(exchangeName,"direct"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue,exchangeName,"info"); channel.queueBind(queue,exchangeName,"error"); channel.queueBind(queue,exchangeName,"warning"); BasicConsume (queue,true,new DefaultConsumer(channel){@override public void handleDelivery(String) consumerTag, Envelope envelope, AMQP.BasicProperties properties, Byte [] body) throws IOException {system.out.println (" Consumer 2: "+new String(body)); byte[] body) throws IOException {system.out.println (" Consumer 2: "+new String(body)); }}); }}Copy the code
Running results:
Direct Switch:
Because the routing key bound to the queue and switch in consumer 1 is ERROR, but the routing key specified by the producer when sending the message is INFO, consumer 1 will not receive the message.
4.5 Routing – Topic
A Topic exchange can route messages to different queues based on a RoutingKey compared to a Direct exchange. It’s just that Exchange, a Topic type switch, can make queues use wildcards when binding routingkeys.
Topic wildcards are usually made up of one or more words, separated by “. Segmentation. For example: the item. The insert.
// Wildcard * matches only one word # Matches zero, one, or more word examples: qingyuan.# Matches qingyuan. Test qingyuan.testCopy the code
Producers:
/** * topic-producer */ public class Provider {public static void main(String[] args) throws IOException, TimeoutException {// create a ConnectionFactory object to connect to mq ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / set up the connection of the rabbitmq host connectionFactory. SetHost (" yourserverip "); / / set the port number the connectionFactory. SetPort (5672); / / set up the connection of the virtual host connectionFactory. SetVirtualHost ("/not "); / / set access virtual host user name and password connectionFactory. SetUsername (" qingyuan "); connectionFactory.setPassword("qingyuan"); / / to get Connection object Connection Connection = connectionFactory. NewConnection (); Channel Channel = connection.createchannel (); // Declare the switch channel.exchangeDeclare("topics","topic"); / / routing key String routekey = "save. User. Delete. Qingyuan"; Channel.basicpublish ("topics", routeKey,null,(" routeKey: ["+ routeKey +"]").getBytes()); // Close channel.close(); connection.close(); }}Copy the code
Consumer 1:
/** * topic1 */ public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// create a ConnectionFactory object to connect to mq ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / set up the connection of the rabbitmq host connectionFactory. SetHost (" yourserverip "); / / set the port number the connectionFactory. SetPort (5672); / / set up the connection of the virtual host connectionFactory. SetVirtualHost ("/not "); / / set access virtual host user name and password connectionFactory. SetUsername (" qingyuan "); connectionFactory.setPassword("qingyuan"); / / to get Connection object Connection Connection = connectionFactory. NewConnection (); Channel Channel = connection.createchannel (); // Declare the switch channel.exchangeDeclare("topics","topic"); String queue = channel.queuedeclare ().getQueue(); Channel.queuebind (queue,"topics","*.user.*"); BasicConsume (queue,true,new DefaultConsumer(channel){@override public void handleDelivery(String) consumerTag, Envelope envelope, AMQP.BasicProperties properties, Byte [] body) throws IOException {system.out.println (" Consumer 1: "+ new String(body)); byte[] body) throws IOException {system.out.println (" Consumer 1: "+ new String(body)); }}); }}Copy the code
Consumer 2:
/** * topic2 */ public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// create a ConnectionFactory object to connect to mq ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / set up the connection of the rabbitmq host connectionFactory. SetHost (" yourserverip "); / / set the port number the connectionFactory. SetPort (5672); / / set up the connection of the virtual host connectionFactory. SetVirtualHost ("/not "); / / set access virtual host user name and password connectionFactory. SetUsername (" qingyuan "); connectionFactory.setPassword("qingyuan"); / / to get Connection object Connection Connection = connectionFactory. NewConnection (); Channel Channel = connection.createchannel (); // Declare the switch channel.exchangeDeclare("topics","topic"); String queue = channel.queuedeclare ().getQueue(); channel.queueBind(queue,"topics","*.user.#"); BasicConsume (queue,true,new DefaultConsumer(channel){@override public void handleDelivery(String) consumerTag, Envelope envelope, AMQP.BasicProperties properties, Byte [] body) throws IOException {system.out.println (" Consumer 2: "+ new String(body)); byte[] body) throws IOException {system.out.println (" Consumer 2: "+ new String(body)); }}); }}Copy the code
Running results:
Routing key set by producer sends a message: “the save. User. Delete. Qingyuan”
Consumer 1’s temporary queue is bound to the switch’s routing key: “.user.”
Consumer 2’s temporary queue is bound to the switch’s routing key: “*.user.#”
Consumer 1 can only match users with a single word after them, while consumer 2 can match users with multiple words after them.
5. Use RabbitMQ in SpringBoot
5.0 configuration
server.port=9090
spring.application.name=rabbitmq-demo
## rabbitmq
spring.rabbitmq.host=yourserverip
spring.rabbitmq.port=5672
spring.rabbitmq.username=qingyuan
spring.rabbitmq.password=qingyuan
spring.rabbitmq.virtual-host=/demo1
Copy the code
5.1 the HelloWorld
Producers:
/** * helloworld provider */ @RunWith(SpringRunner.class) @SpringBootTest public class ProviderBoot { @Autowired private RabbitTemplate rabbitTemplate; @ Test public void contextLoads () {/ * * * @ param1 @ param2 send message queue name * * / rabbitTemplate. ConvertAndSend (" hello ", "hello world"); }}Copy the code
Consumer:
@Component @RabbitListener(queuesToDeclare = @Queue(value="hello")) public class ConsumerBoot { @Autowired RabbitTemplate rabbitTemplate; @RabbitHandler public void reveive(String message){ System.out.println("message = " + message); }}Copy the code
Run the SpringBoot boot class:
5.2 SpringBoot版Work Queues
Producers:
@RunWith(SpringRunner.class)
@SpringBootTest
public class ProviderBoot {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void convertAndSend(){
for(int index =0;index<10;index++){
rabbitTemplate.convertAndSend("work","hello work");
}
}
}
Copy the code
Consumer:
@Component public class Consumer { @Autowired RabbitTemplate rabbitTemplate; @RabbitListener(queuesToDeclare = @Queue(value="work")) public void receive1(String message){ System.out.println("work message1 = " + message); } @RabbitListener(queuesToDeclare = @Queue(value = "work")) public void receive2(String message){ System.out.println("work message2 = " + message); }}Copy the code
Similarly, the Work Queues pattern, by default, is consumers who distribute messages evenly.
5.3 the Fanout mode
Fanout topic refers to the binding relationship between the consumer side, queue and switch.
Producers:
@RunWith(SpringRunner.class) @SpringBootTest public class ProviderBoot { @Autowired RabbitTemplate rabbitTemplate; @test public void convetAndSend(){/** * @param1 Switch name * @Param2 Route key, Null to any routing * * @ param3 news/rabbitTemplate convertAndSend (" logs2 ", ""," this is the fanout message "); }}Copy the code
Consumer:
@Component public class FanoutConsumer { @Autowired RabbitTemplate rabbitTemplate; @RabbitListener( bindings = @QueueBinding( value = @Queue, Exchange = @exchange (name="logs2",type="fanout")) public void Receive1 (String message){// Create temporary queue exchange = @exchange (name="logs2",type="fanout")) System.out.println("message1 = " + message); } @RabbitListener( bindings = @QueueBinding( value = @Queue, exchange = @Exchange(name="logs2",type = "fanout") ) ) public void receive2(String message){ System.out.println("message2 = " + message); }}Copy the code
5.4 Direct
Producers:
@RunWith(SpringRunner.class) @SpringBootTest public class ProviderBoot { @Autowired RabbitTemplate rabbitTemplate; @test public void convertAndSend(){/** * @param1 Switch name * @param2 Route key * @param3 message */ RabbitTemplate. ConvertAndSend (" directs ", "error", "error log information"); }}Copy the code
Consumer:
@Component public class DirectConsumer { @RabbitListener( bindings = { @QueueBinding( value = @Queue, Key = {"info","error"}, // Route Key exchange = @exchange (name = "directs",type = "direct"))}) public void Receive1 (String message){ System.out.println("message1 = " + message); } @RabbitListener( bindings = { @QueueBinding( value = @Queue, key = {"other"}, exchange = @Exchange(name="directs",type = "direct") ) } ) public void receive2(String message){ System.out.println("message2 = " + message); }}Copy the code
5.5 the Topic
Producers:
@RunWith(SpringRunner.class) @SpringBootTest public class ProviderBoot { @Autowired RabbitTemplate rabbitTemplate; @Test public void convertAndSend(){ rabbitTemplate.convertAndSend("topics2","user.save.findAll","user.save.findAll The message "); }}Copy the code
Consumer:
@Component public class TopicConsumer { @RabbitListener( bindings = { @QueueBinding( value = @Queue, key = {"user.*"}, exchange = @Exchange(name="topics2",type = "topic") ) } ) public void receive1(String message){ System.out.println("message1 = " + message); } @RabbitListener( bindings = { @QueueBinding( value = @Queue, key = {"user.#"}, exchange = @Exchange(name = "topics2",type = "topic") ) } ) public void receive2(String message){ System.out.println("message2 = " + message); }}Copy the code
Running results:
6. Reference materials
1. Refer to the video
2. The rabbitmq installation
3.AMQP 0-9-1 Model Explained
4. Once slow -RabbitMQ