This is the 29th day of my participation in Gwen Challenge
Getting started with RabbitMQ
Introduction to a,
1, an overview of the
-
Summary of MQ
- MQ (Message Queue) Message Queue is a container for storing messages during Message transmission. It is used for communication between distributed systems.
-
MQ advantage
- Application decoupling: After joining MQ, the failure of one service does not affect other services, improving fault tolerance and maintainability of the system.
- Asynchronous speed: When a user requests a service, the corresponding MQ is immediately received, improving user experience and system throughput.
- Peak clipping: In a high-concurrency environment, MQ is used to handle a large number of requests, which are processed slowly in a message queue to prevent server downtime.
-
MQ disadvantage
- Reduced system availability: MQ outages affect services.
- Increased system complexity: After using MQ asynchronous calls, you need to ensure that messages are not consumed, lost, delivered in a repetitive order.
- Consistency problem: ensure data consistency between modules.
-
Summary of the RabbitMQ
- RabbitMQ is a message queue product based on the AMQP protocol.
- RabbitMQ provides six modes of operation
2. Install RabbitMQ
2.1 Installation depends on the environment
Online Installation dependent environment:
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xzCopy the code
2.2. Install Erlang
Upload the installation package to Linux
#The installationThe RPM - the ivh Erlang - 18.3-1. El7. Centos. X86_64. RPMCopy the code
2.3 install RabbitMQ
#The installationRPM --force --nodeps socat-1.7.3.2-1.1.el7.x86_64. RPM#The installationThe RPM - the ivh the rabbitmq server - 3.6.5-1. Noarch. RPMCopy the code
2.4. Open the management page and configuration
# Open the management page
rabbitmq-plugins enable rabbitmq_management
# Change the default configurationVim/usr/lib/rabbitmq/lib/rabbitmq_server - 3.6.5 ebin/rabbit. The appFor example: <<"guest">> in loopback_users, keep only guest
Copy the code
2.5, start,
service rabbitmq-server start # start service
service rabbitmq-server stop # stop service
service rabbitmq-server restart # restart service
Copy the code
- Setting the configuration file
CD/usr/share/doc/the rabbitmq server - 3.6.5 / cp rabbitmq. Config. The example/etc/rabbitmq/rabbitmq configCopy the code
2.6 User Roles
Once RabbitMQ is installed, you can visit http://ip 15672; It comes with the guest/guest username and password. If you need to create a custom user. You can also log in to the management page and perform the following operations:
Role Description:
1. Super administrator
Can log in the management console, can view all the information, and can operate the user, policy.
2. Monitoring
Log on to the management console and view rabbitMQ node information (number of processes, memory usage, disk usage, etc.)
3. Policymaker
You can log in to the management console and manage policies. However, you cannot view the node information (marked by the red box in the figure above).
4. Management
You can only log in to the management console. You cannot see node information or manage policies.
5, other
Unable to log on to the administrative console, it is usually the ordinary producer and consumer.
2.7. Configure Virtual Hosts
Mysql, for example, has the concept of a database and can specify user permissions for operations such as libraries and tables. RabbitMQ has similar permission management; A Virtual message server can be installed in RabbitMQ. Each VirtualHost is an independent RabbitMQ server. Each VirtualHost is isolated from the other. The Exchange, Queue, and Message cannot communicate with each other. Equivalent to mysql db. The Virtual Name usually starts with a slash (/).
2.8. Create Virtual Hosts
2.9. Set permissions on Virtual Hosts
3. Simple use
Requirements: One producer sends the message and one consumer receives the message.
Prepare a producer project and a consumer project, then import the dependencies
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
Copy the code
3.1. Producers
/** * Producer: sends messages */
public class HelloProducer {
public static void main(String[] args) throws IOException, TimeoutException {
// Create a connection factory
ConnectionFactory factory = new ConnectionFactory();
// Set parameters
factory.setHost("47.95.226.96"); / / set the IP
factory.setPort(5672); // Set the port
factory.setVirtualHost("/yylm"); // Set the VM
factory.setUsername("jack"); // Set the user
factory.setPassword("123456"); // Set the password
// Create a connection
Connection connection = factory.newConnection();
/ / create the channel
Channel channel = connection.createChannel();
// Create a Queue
/* channel.queueDeclare(....) Parameter Description String Queue Queue name. If the specified queue does not exist, it will be automatically created. Boolean Durable Whether messages remain after MQ restarts. Whether to delete the queue when the connection is closed. Boolean autoDelete Specifies whether to delete automatically. Automatically delete Map
arguments */ when there are no consumers
,>
channel.queueDeclare("hello_world".true.false.false.null);
// Send a message
/* channel.basicPublish(...) Parameter Description String Exchange switch name. The default value is used in simple mode. String routingKey Route name. If the default switch is used, the route name must be the same as the queue name. BasicProperties Props Configuration Information Byte [] body Message data to be sent */
String body = "hello rabbitMQ!";
channel.basicPublish(""."hello_world".null, body.getBytes());
// Release resourceschannel.close(); connection.close(); }}Copy the code
A queue will appear in the RabbitMQ admin interface
3.2 Consumers
/** * Consumers: receive messages */
public class HelloConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
// Create a connection factory
ConnectionFactory factory = new ConnectionFactory();
// Set parameters
factory.setHost("47.95.226.96"); / / set the IP
factory.setPort(5672); // Set the port
factory.setVirtualHost("/yylm"); // Set the VM
factory.setUsername("jack"); // Set the user
factory.setPassword("123456"); // Set the password
// Create a connection
Connection connection = factory.newConnection();
/ / create the channel
Channel channel = connection.createChannel();
// Create a Queue
channel.queueDeclare("hello_world".true.false.false.null);
// Receive the message
/* channel.basicConsume(...) Parameter Description String Queue Queue name Boolean Whether the autoAck automatically acknowledges the Consumer callback callback */
Consumer consumer = new DefaultConsumer(channel) {
/** * the callback method is executed when a message is received@paramConsumerTag Message identifier *@paramEnvelope fetch information, switches, routing keys... *@paramProperties Configuration information *@paramThe body data *@throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag" + consumerTag);
System.out.println("envelope.getExchange" + envelope.getExchange());
System.out.println("envelope.getRoutingKey" + envelope.getRoutingKey());
System.out.println("properties" + properties);
System.out.println("body" + newString(body)); }}; channel.basicConsume("hello_world".true, consumer);
// The consumer does not have to close the resource and keep listening}}Copy the code
2. How RabbitMQ works
The example above uses the simple working mode of RabbitMQ.
The different working modes are different message routing and policy. Is a method of message distribution.
Queues 1. Queues are common queues.
A producer produces a message and puts it in a message queue, two or more consumers compete for the message, and only one consumer can get the message.
Application scenario: Use a work queue to improve the processing speed of heavy or large tasks.
The code is similar to the simple pattern, where the producer produces ten messages and uses two consumers to get the message, each one of which gets the message once.
2. Pub/Sub (Subscription)
Producers send messages to switches, which distribute messages to different message queues according to routing rules, and consumers listen to queues for messages.
Exchange: Receives messages sent by producers on the one hand, and processes messages on the other. There are three types of switches
- Fanout: Broadcast messages to all queues bound to the switch.
- Direct: directs the message to the queue matching the specified routing key
- Topic: a wildcard that sends a message to a routing pattern queue
The switch only forwards messages, not saves them, so if there are no queues tied to the switch or no routing rules that follow, the messages disappear.
The producer pattern for Pub/Sub
/** * Subscription mode * producer: sends messages */
public class HelloProducer_PubSub {
public static void main(String[] args) throws IOException, TimeoutException {
// Create a connection factory
ConnectionFactory factory = new ConnectionFactory();
// Set parameters
factory.setHost("47.95.226.96"); / / set the IP
factory.setPort(5672); // Set the port
factory.setVirtualHost("/yylm"); // Set the VM
factory.setUsername("jack"); // Set the user
factory.setPassword("123456"); // Set the password
// Create a connection
Connection connection = factory.newConnection();
/ / create the channel
Channel channel = connection.createChannel();
// Create a switch
/* channel.exchangeDECLARE () Parameter Description String Exchange switch name BuiltinExchangeType Type Switch type, enumeration, Boolean durable Whether to persist Boolean autoDelete Whether to automatically delete Boolean Internal Used internally, Normally false Map
arguments list */
,>
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true.false.false.null);
// Create a queue
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name, true.false.false.null);
channel.queueDeclare(queue2Name, true.false.false.null);
// Bind queues and switches
/* channel.queuebind () String queue queue name String exchange switch name String routingKey routingKey, binding rule, switch fanout, The routingKey is set to an empty String, meaning that the exchange will distribute each message to the queue Map
arguments */
,>
channel.queueBind(queue1Name, exchangeName, "".null);
channel.queueBind(queue2Name, exchangeName, "".null);
// Send a message
String body = "PubSub_info";
channel.basicPublish(exchangeName, "".null, body.getBytes());
// Release resourceschannel.close(); connection.close(); }}Copy the code
As before, the consumer switches the queue to the queue created by the producer
3. Routing
Description:
- The queue is bound to the switch. It cannot be bound arbitrarily. You need to specify a routingKey.
- Messages must specify a routingKey when sending messages to the switch
- Instead of passing messages to each queue, the exchange decides by the routingKey that it will receive messages only if the routingKey of the queue matches the routingKey of the message.
Where the producer needs to change
.// Create a switch
String exchangeName = "test_direct";
// Change the working mode to DIRECT
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true.false.false.null);
// Create a queue
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name, true.false.false.null);
channel.queueDeclare(queue2Name, true.false.false.null);
// Bind queues and switches
// The binding of queue 1, where the routingKey cannot be empty, needs to specify the rule
channel.queueBind(queue1Name, exchangeName, "error".null);
// Queue 2 binding
channel.queueBind(queue2Name, exchangeName, "info".null);
channel.queueBind(queue2Name, exchangeName, "error".null);
channel.queueBind(queue2Name, exchangeName, "warning".null);
// Send a message
String body = "log-level=info";
// Specify the routingKey for the message when sending it
channel.basicPublish(exchangeName, "warning".null, body.getBytes()); .Copy the code
4. Topics (wildcard mode)
Topics mode can implement Pub/Sub publish and subscribe and Routing mode, but Toics can configure the RoutingKey with wildcards and display more flexibly.
Part of the code
. String exchangeName ="test_topics";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true.false.false.null);
// Create a queue
String queue1Name = "test_topics_queue1";
String queue2Name = "test_topics_queue2";
channel.queueDeclare(queue1Name, true.false.false.null);
channel.queueDeclare(queue2Name, true.false.false.null);
// Bind queues and switches
// Declare the name of the routingKey system. Log Level
// All error level logs and order system logs are stored in the database
channel.queueBind(queue1Name, exchangeName, "#.error".null);
channel.queueBind(queue1Name, exchangeName, "order.*".null);
channel.queueBind(queue2Name, exchangeName, "*. *".null);
// Send a message
String body = "PubSub_info";
channel.basicPublish(exchangeName, "aa.info".null, body.getBytes()); .Copy the code
3. Integrate RabbitMQ
1. Spring integrates RabbitMQ
Import dependence
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7. RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8. RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7. RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
Copy the code
Extract the configuration information
rabbitmq.host=47.95.226.96
rabbitmq.port=5672
rabbitmq.username=jack
rabbitmq.password=123456
rabbitmq.virtual-host=/yylm
Copy the code
1.1. Producers
Spring integrates RabbitMQ configuration files
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<! Load configuration file -->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<! Rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<! -- Define management switch, queue
<rabbit:admin connection-factory="connectionFactory"/>
<! -- Define a persistent queue. If it does not exist, it will be created automatically. If it is not bound to a switch, it is bound to the default switch. The default switch type is direct, the name is "", and the routing key is the queue name.
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
<! -- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ broadcast; All queues can receive messages ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<! -- Define a persistent queue in the broadcast switch. If it does not exist, it will be created automatically.
<rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
<! -- Define a persistent queue in the broadcast switch. If it does not exist, it will be created automatically.
<rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
<! Define the broadcast type switch; And bind the above two queues -->
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue_1"/>
<rabbit:binding queue="spring_fanout_queue_2"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<! -- Direct switch binding queue, specify queue and routing key -->
<rabbit:direct-exchange name="spring_direct_exchange" id="spring_direct_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_direct_queue_1" key="error"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<! -- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ wildcard; * Match one word, # match multiple words ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<! -- Define a persistent queue in the broadcast switch. If it does not exist, it will be created automatically.
<rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
<! -- Define a persistent queue in the broadcast switch. If it does not exist, it will be created automatically.
<rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
<! -- Define a persistent queue in the broadcast switch. If it does not exist, it will be created automatically.
<rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>
<! --topic switch bind queue, specify wildcard -->
<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="marry.*" queue="spring_topic_queue_star"/>
<rabbit:binding pattern="july.#" queue="spring_topic_queue_well"/>
<rabbit:binding pattern="tom.#" queue="spring_topic_queue_well2"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<! The rabbitTemplate object operation can be used to send messages easily.
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
Copy the code
The test code
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class Producer {
/ / injection rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
/** * Send simple mode message */
@Test
public void test1(a) {
String body = "hello-spring-rabbitMq";
//convertAndSend(queue name, message sent)
rabbitTemplate.convertAndSend("spring_queue", body);
}
/** * Send Fanout message */
@Test
public void test2(a) {
String body = "hello-spring-rabbitMq-fanout";
//convertAndSend(switch, routing key, message)
rabbitTemplate.convertAndSend("spring_fanout_exchange"."", body);
}
/** * send Topic message */
@Test
public void test3(a) {
String body = "hello-spring-rabbitMq-Topic";
//convertAndSend(switch, routing key, message)
rabbitTemplate.convertAndSend("spring_topic_exchange"."marry.hello", body); }}Copy the code
1.2. Consumers
The configuration file
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<! Load configuration file -->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<! Rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<! -- Define listener -->
<bean id="springQueueListener" class="cn.yylm.rabbitmq.listener.SpringQueueListener"/>
<! Register listener with container -->
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
<rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
</rabbit:listener-container>
</beans>
Copy the code
Listener class
public class SpringQueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println(newString(message.getBody())); }}Copy the code
Load the configuration file in using a test class run
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {
@Test
public void test(a){
while (true) {}}}Copy the code
2. SpringBoot integrates RabbitMQ
Import dependence
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code
The configuration file
spring:
rabbitmq:
host: 47.95226.96.
port: 5672
username: jack
password: 123456
virtual-host: /yylm
Copy the code
2.1. Producers
Use a configuration class to configure switches, queues, and bindings
@Configuration
public class RabbitConfig {
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue";
// Configure the switch
@Bean("bootExchange")
public Exchange bootExchange(a) {
//ExchangeBuilder can build four types of switches
//topicExchange(EXCHANGE_NAME) Switch name, durable whether to persist, Build Build
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
// Configure the queue
@Bean("bootQueue")
public Queue bootQueue(a) {
// Create a queue with a name
return QueueBuilder.durable(QUEUE_NAME).build();
}
// Bind switches to queues
@Bean()
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange) {
// Bind the queue and switch, specify the routing key, whether the parameter is required
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs(); }}Copy the code
test
@SpringBootTest
class SpringbootRabbitmqProducerApplicationTests {
@Autowired
private RabbitTemplate template;
@Test
void test(a) {
template.convertAndSend(RabbitConfig.EXCHANGE_NAME,"boot.hello"."hello boot rabbitMQ"); }}Copy the code
2.2 Consumers
Listen for messages on the queue
@Component
public class RabbitMQListener {
@RabbitListener(queues = "boot_queue")
public void listenerQueue(Message message){
System.out.println(newString(message.getBody())); }}Copy the code