MQ is introduced
-
MQ: Message Queue. A producer produces a message and stores it in a queue, and a consumer listens to the queue and consumes the message
-
Advantages: The production and consumption of messages in MQ is asynchronous, with non-intrusive, low coupling between producer and consumer
-
Common MQ frameworks
# ActiveMQ Apache produced, the old message bus, comply with the JMS specification, provide a rich API. # Kafka Apache is an open source publishing and subscription messaging system with no transaction support, no strict control over errors and loss, high throughput, and data collection services suitable for large amounts of data. # RocketMQ ali open source messaging middleware, pure Java development, high throughput, high availability, suitable for large-scale distributed system applications. The idea started with Kafka. RabbitMQ is an open source message queue system based on the AMQP protocol. It is message, queue and routing oriented with reliability and security. Applicable to scenarios that require high data consistency and stabilityCopy the code
Introduce the RabbitMQ
-
RabbitMQ website: www.rabbitmq.com/
-
The advantages of the RabbitMQ
-Using Erlang language development, Erlang is a powerful Socket programming language-Based on AMQP protocol, it is cross-platform-Easy integration with SpringBoot-Very data consistency friendlyCopy the code
-
RabbitMQ related concepts
# closer agreement Proposed in 2003, is a kind of advanced message protocol, does not limit the API layer, directly defines the network exchange data format, has the natural cross-platform. A virtual host has a set of switches, queues, and bindings that are used to divide RabbiMQ services. Generally, different services are configured with different virtual hosts. Users have permission control in the granularity of virtual hosts. Each RabbitMQ server is configured with a default virtual host '/' # switch for forwarding messages to queues. A channel object is created from the Connection object, and the data is transferred using the channel object. A channel can be regarded as a virtual connection, avoiding the overhead of creating real connections frequently Copy the code
-
To obtain the RabbitMQ
-
For Windows, download the RabbitMQ installation package from the official website and start it with the Erlang environment
-
Linux installs RabbitMQ through the package manager
-
Linux uses Dcoker to pull the RabbitMQ image and start it
-
-
Manage the RabbitMQ
RabbitMQ provides a Web management page on port 15672 by default. You can log in to the Web page to manage RabbitMQ service configurations
Such as adding a virtual host
If you add a user, you can set the user’s access to the virtual host
Use the RabbitMQ
The following RabbitMQ version is 3.8.23
Create channels
// Get the factory object of the MQ connection object
ConnectionFactory connectionFactory = new ConnectionFactory();
// Set the connection IP
connectionFactory.setHost("127.0.0.1");
// Set the port number
connectionFactory.setPort(5672);
// Set the virtual host
connectionFactory.setVirtualHost("demoMQ");
// Set the user name and password
connectionFactory.setUsername("demoUser");
connectionFactory.setPassword("xxxxxxx");
// Get the connection object
Connection connection = connectionFactory.newConnection();
// Create a channel object
Channel channel = connection.createChannel();
Copy the code
Create a queue
channel.queueDeclare("demoQueue".false.false.false.null);
/** * Parameter 1 Queue: name of the message queue * Parameter 2 Durable: Whether the queue is persistent (excluding messages in the queue) * Parameter 3 EXCLUSIVE: Whether the current connection is exclusive to the queue * Parameter 4 autoDelete: Whether the queue is automatically deleted after message consumption is complete and the connection is disconnected * argument 5 Argument: additional argument */
Copy the code
Work queue model
-
Work queue model
With a message queue, there are one or more consumers, each of whom retrieves a different message for consumption
-
news
channel.basicPublish(""."demoQueue".null."demo queue".getBytes()); /** * parameter 1 exchange: switch name * parameter 2 routingKey: routing * parameter 3 props; Additional arguments * Argument 4 Body: Byte array for the message */ Copy the code
If the switch name is null, the default switch is used
Each queue automatically binds its Routing Key with the same name to the default switch
The channel and connection must be closed after being used
-
News consumption
// bind the same queue as the producer channel.queueDeclare("demoQueue".false.false.false.null); // Start message listening /** * Parameter 1 queue: queue name * parameter 2 autoAck: automatic acknowledgement * parameter 3 callback: callback interface object */ channel.basicConsume("demoQueue".true.new DefaultConsumer(channel) { // Body is the message content @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println(newString(body)); }});Copy the code
-
Distribution of the message
In the work queue model, messages are evenly distributed across consumers by default
If manual acknowledgment is used and only one message is consumed on each channel, the next message can be consumed only after acknowledgment
In this way, the number of messages allocated is related to the speed of processing
Manual acknowledgement message
// Set the channel to consume only one message at a time // The channel does not get the allocation of the new message until it acknowledges the message channel.basicQos(1); // autoAck false channel.basicConsume("demoQueue".false.new DefaultConsumer(channel) { @SneakyThrows @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) { System.out.println(new String(body)); Thread.sleep(3000); // Argument 1: the flag of the message, obtained through envelope. GetDeliveryTag () // Parameter 2: Whether to enable multi-message acknowledgement channel.basicAck(envelope.getDeliveryTag(), false); }});Copy the code
Publish and subscribe model
-
Radio model
Each consumer has its own queue, and each queue is bound to the switch, and the producer sends messages to the switch for distribution
-
news
// Declare a switch // Parameter 1: indicates the switch name // Parameter 2: switch type, fanout broadcast channel.exchangeDeclare("demoExchange"."fanout"); // Send a message channel.basicPublish("demoExchange"."".null."demo exchange".getBytes()); Copy the code
-
News consumption
// Declare a switch channel.exchangeDeclare("demoExchange"."fanout"); // Get the temporary queue name String tempQueueName = channel.queueDeclare().getQueue(); // Bind queues to switches channel.queueBind(tempQueueName, "demoExchange".""); // Start message listening channel.basicConsume(tempQueueName, false.new DefaultConsumer(channel) { @SneakyThrows @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) { System.out.println(new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); }});Copy the code
In the publish-subscribe model, producers only need to declare switches, consumers declare temporary queues and bind to switches, Routing keys all use empty strings, and once the switch receives the message, it forwards it to each bound queue
Routing topic model
-
Routing topic model
In the publish-subscribe model, all consumers can get the same message
In the routing topic model, the switch no longer forwards messages to every queue, but matches more routes and topics
-
news
// Declare a switch channel.exchangeDeclare("routeExchange"."direct"); // Send a message // Publish a message with the Routing Key info channel.basicPublish("routeExchange"."info".null."demo route info".getBytes()); // Publish a message with the Routing Key error channel.basicPublish("routeExchange"."error".null."demo route error".getBytes()); Copy the code
-
News consumption
// Declare a switch. The type is direct channel.exchangeDeclare("routeExchange"."direct"); // Get the temporary queue name String tempQueueName = channel.queueDeclare().getQueue(); // Bind the switch, queue, and Routing Key channel.queueBind(tempQueueName, "routeExchange"."info"); // Start message listening channel.basicConsume(tempQueueName, false.new DefaultConsumer(channel) { @SneakyThrows @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) { System.out.println(new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); }});Copy the code
Channel. queueBind can be called multiple times to bind multiple Routing keys
-
Dynamic routing
Dynamic Routing can use wildcards in Routing keys, with # representing any string and * representing a word
Producer releases message
// Declare a switch channel.exchangeDeclare("topicExchange"."topic"); // Send several messages with different routes channel.basicPublish("topicExchange"."log.error".null."log.error".getBytes()); channel.basicPublish("topicExchange"."log.error.file".null."log.error.file".getBytes()); channel.basicPublish("topicExchange"."log.info".null."log.info".getBytes()); channel.basicPublish("topicExchange"."log.info".null."user.info".getBytes()); Copy the code
Consumer binding queue
// Declare the switch, topic type channel.exchangeDeclare("topicExchange"."topic"); // Get the temporary queue name String tempQueueName = channel.queueDeclare().getQueue(); // Bind queues and routes channel.queueBind(tempQueueName, "topicExchange"."log.*"); /** * log.* can match log.error, log.info, log.error. File **. Info can match log.info, user.info */ Copy the code
Summary of three models:
Work queue model: No switch single queue, content-message
Publish subscribe model: switch empty routing, same message
Routing topic model: There are switches with routes, by route
Integrated SpringBoot
-
Introduction of depend on
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> Copy the code
-
The configuration application. Yaml
spring: rabbitmq: host: 127.0. 01. port: 5672 username: demoUser password: xxxxx virtual-host: demoMQ Copy the code
Once configured, the RabbitTemplate is available. RabbitTemplate encapsulates a set of RabbitMQ operations
-
Work queue model
news
@SpringBootTest public class RabbitProviderTest { @Autowired RabbitTemplate rabbit; // Emulate the RabbitConsumer object // Prevent RabbitConsumer consumption messages instantiated during test startup @MockBean public RabbitConsumer consumer; @Test public void queueProvide(a) { // Send messages to the queue rabbit.convertAndSend("demoQueue"."hello demo queue"); }}Copy the code
News consumption
@Component // @queue Specifies parameters such as durable and exclusive QueueDeclare ("demoQueue", true, false, false, null); // The default is channel.queueDeclare("demoQueue", true, false, false, null); @RabbitListener(queuesToDeclare = @Queue(name = "demoQueue")) public class RabbitConsumer { // The message handler can get the message body directly @RabbitHandler public void queueConsume(String message) { System.out.println(message); }}/ * * *@RabbitListenerIt can also be used on methods, denoted as message handler */ Copy the code
In Spring AMQP, the work queue model is fair consumption
-
Publish and subscribe model
news
@SpringBootTest public class RabbitProviderTest { @Autowired RabbitTemplate rabbit; @MockBean public RabbitConsumer consumer; @Test public void subscribeProvide(a) { rabbit.convertAndSend("demoExchange".""."hello demo exchange"+ i); }}Copy the code
News consumption
@Component public class RabbitConsumer { // The first consumer // @queue creates a temporary Queue without specifying an argument @RabbitListener(bindings = @QueueBinding(value = @Queue, key = "", exchange = @Exchange(name = "demoExchange", type = "fanout"))) public void subscribeConsumer1(String message) { System.out.println("SubscribeConsume1 receives:" + message); } // The second consumer @RabbitListener(bindings = @QueueBinding(value = @Queue, key = "", exchange = @Exchange(name = "demoExchange", type = "fanout"))) public void subscribeConsumer2(String message) { System.out.println("SubscribeConsume2 receives:"+ message); }}Copy the code
-
Routing topic model
news
@SpringBootTest public class RabbitProviderTest { @Autowired RabbitTemplate rabbit; @MockBean public RabbitConsumer consumer; @Test public void routeProvide(a) { rabbit.convertAndSend("routeExchange"."log.error"."log.error"); rabbit.convertAndSend("routeExchange"."log.info"."log.info"); rabbit.convertAndSend("routeExchange"."log.error.file"."log.error.file"); rabbit.convertAndSend("routeExchange"."user.info"."user.info"); }}Copy the code
News consumption
@Component public class RabbitConsumer { @RabbitListener(bindings = @QueueBinding(value = @Queue, key = "log.*", exchange = @Exchange(name = "routeExchange", type = "topic"))) public void routeConsumer1(String message) { System.out.println("RouteConsumer1 receives:" + message); } @RabbitListener(bindings = @QueueBinding(value = @Queue, key = "log.#", exchange = @Exchange(name = "routeExchange", type = "topic"))) public void routeConsumer2(String message) { System.out.println("RouteConsumer2 receives:" + message); } @RabbitListener(bindings = @QueueBinding(value = @Queue, key = "*.info", exchange = @Exchange(name = "routeExchange", type = "topic"))) public void routeConsumer3(String message) { System.out.println("RouteConsumer3 receives:"+ message); }}/** * To use the wildcard to implement dynamic routing, set type to topic */ Copy the code
The RabbitMQ cluster
-
Basic steps
Prepare the RabbitMQ service and deploy two or more RabbitMQ services
Configure the.erlang.cookie file..erlang.cookie is the key to join the service cluster and remains the same in the cluster service
Run the cluster joining command on the secondary node
Rabbitmqctl join_cluster --ram Rabbit @#-- RAM indicates a memory node. If no ram is added, it is a disk node by default Copy the code
Set up a mirror queue on any node
Strategy rabbitmqctl set_policy < name > "< name > queue" '{" ha - mode ":" < > mirror mode "}'#Parameter 1: indicates the policy name #Parameter 2: Matching rules for queue names, using regular expressions #Parameter 3: The principal rule of the mirror queue, a JSON string, with three attributes: ha-mode/ha-params/ha-sync-mode #Ha-mode: mirroring mode, all/exactly/ Nodes, all is stored on all nodes #--vhost Sets the virtual host Copy the code
-
Docker RabbitMQ cluster
Pull the RabbitMQ image
Docker pull the rabbitmq: 3.8.23 - managementCopy the code
Prepare the rabbitmq.conf configuration file and configure default accounts and virtual hosts
loopback_users.guest = false listeners.tcp.default = 5672 management.tcp.port = 15672 default_user = cluster default_pass = xxxxx default_vhost = clusterMQ Copy the code
Start three RabbitMQ containers and mount the same.erlang.cookie file
# 1docker run \ --name rabbitmq1 \ -h rabbitmq1 \ -p 15673:15672 \ -p 5673:5672 \ -v /var/docker/rabbitmq_cluster/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \ -v /var/docker/rabbitmq_cluster/data1:/var/lib/rabbitmq \ -v / var/docker/rabbitmq_cluster. Erlang. Cookies: / var/lib/rabbitmq /. Erlang. Cookies \ - d the rabbitmq: 3.8.23 - management# 2docker run \ --name rabbitmq2 \ -h rabbitmq2 \ -p 15674:15672 \ -p 5674:5672 \ -v /var/docker/rabbitmq_cluster/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \ -v /var/docker/rabbitmq_cluster/data2:/var/lib/rabbitmq \ -v /var/docker/rabbitmq_cluster/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \ --link rabbitmq1:rabbitmq1 \ -d The rabbitmq: 3.8.23 - management# 3docker run \ --name rabbitmq3 \ -h rabbitmq3 \ -p 15675:15672 \ -p 5675:5672 \ -v /var/docker/rabbitmq_cluster/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \ -v /var/docker/rabbitmq_cluster/data3:/var/lib/rabbitmq \ -v /var/docker/rabbitmq_cluster/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \ --link rabbitmq1:rabbitmq1 --link Rabbitmq2: rabbitmq2 \ - d the rabbitmq: 3.8.23 - managementCopy the code
To join the cluster
#Into the container docker exec -it rabbitmq2 bash #Stop the service rabbitmqctl stop_app #To add node 1, rabbit@ must use the host name, not the IP address #The host name is the parameter after docker run-h rabbitmqctl join_cluster --ram rabbit@rabbitmq1 Copy the code
Configure a policy to specify the virtual host as clusterMQ
rabbitmqctl set_policy --vhost clusterMQ demoPolicy "^" '{"ha-mode":"all"}' #use"^"Mirrors all queues Copy the code