In this simple article, I will share with you seven different forms of RabbitMQ messaging. Take a look.
Most of the time you will be using RabbitMQ in a Spring Boot or Spring Cloud environment, so I will share RabbitMQ with you in these two areas.
1. RabbitMQ architecture Overview
A picture is worth a thousand words, as follows:
The following concepts are involved in this diagram:
- Publisher: Publishes messages to exchanges in RabbitMQ.
- Exchange: Establishes connections with producers and receives messages from producers.
- Consumer: Listens for messages in the RabbitMQ Queue.
- Queue: The Exchange distributes messages to the specified Queue, which interacts with the consumer.
- Routes: Rules for the switch to forward messages to queues.
2. Preparation
RabbitMQ is the product of the AMQP camp. Spring Boot provides an automatic configuration dependency on spring-boot-starter-AMQP, so create a Spring Boot project and add the dependency as follows:
After the RabbitMQ project is successfully created, configure the basic RabbitMQ connection information in application.properties as follows:
spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
Copy the code
Next, configure RabbitMQ. In RabbitMQ, all messages submitted by the producers are redistributed to Exchange, which allocates the messages to different queues according to different policies.
The RabbitMQ website describes the following types of message distribution:
Here are seven of them, the seventh of which is message confirmation, which songo wrote about earlier, portal:
- Four strategies to ensure RabbitMQ message delivery reliability! Which one do you use?
- RabbitMQ high availability How to ensure successful message consumption
So I’m going to focus on the first six messaging methods.
3. Sending and receiving messages
3.1 Hello World
Yi? Why is there no switch? This is actually the default switch, we need to provide a producer a queue and a consumer. The message propagation diagram is as follows:
Let’s look at the code implementation:
Let’s look at the definition of a queue:
@Configuration
public class HelloWorldConfig {
public static final String HELLO_WORLD_QUEUE_NAME = "hello_world_queue";
@Bean
Queue queue1(a) {
return newQueue(HELLO_WORLD_QUEUE_NAME); }}Copy the code
Let’s look at the definition of a message consumer:
@Component
public class HelloWorldConsumer {
@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
public void receive(String msg) {
System.out.println("msg = "+ msg); }}Copy the code
Message sending:
@SpringBootTest
class RabbitmqdemoApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void contextLoads(a) {
rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello"); }}Copy the code
In this case, the default direct switch (DirectExchange) is used. The routing strategy of DirectExchange is to bind message queues to a DirectExchange. When a message arrives at DirectExchange, it will be forwarded to the Queue with the same routing key as the message. For example, the message Queue name is “hello-queue”. Messages whose routingkey is “hello-queue” are received by the message queue.
3.2 the Work the queues
Here’s how it works:
One producer, one default switch (DirectExchange), one queue, and two consumers, as shown below:
A queue corresponds to multiple consumers, and by default, messages are evenly distributed by the queue, with messages being distributed to different consumers. Consumers can configure their own concurrency to increase message consumption, or they can configure manual ACK to decide whether to consume a particular message.
Let’s take a look at the configuration of the concurrency capability as follows:
@Component
public class HelloWorldConsumer {
@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
public void receive(String msg) {
System.out.println("receive = " + msg);
}
@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,concurrency = "10")
public void receive2(String msg) {
System.out.println("receive2 = " + msg+"-- -- -- -- -- -- -- >"+Thread.currentThread().getName()); }}Copy the code
As you can see, I have set concurrency to 10 for the second consumer. For the second consumer, there will be 10 child threads consuming the message.
Start the project and you can see in the RabbitMQ background that there are 11 consumers.
At this point, if the producer sends 10 messages, all of them will be consumed at once.
The message is sent as follows:
@SpringBootTest
class RabbitmqdemoApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void contextLoads(a) {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello"); }}}Copy the code
The message consumption log is as follows:
As you can see, the messages are consumed by the first consumer. Note, however, that this is not always the case (try a few more times to see the difference), and messages can be consumed by the first consumer (but since the second consumer has ten threads working together, the second consumer consumes a larger percentage of messages).
Message consumers can also enable manual ACK to determine whether to consume RabbitMQ messages. Manual ACK can be configured as follows:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
Copy the code
The consumption code is as follows:
@Component
public class HelloWorldConsumer {
@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
public void receive(Message message,Channel channel) throws IOException {
System.out.println("receive="+message.getPayload());
channel.basicAck(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true);
}
@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, concurrency = "10")
public void receive2(Message message, Channel channel) throws IOException {
System.out.println("receive2 = " + message.getPayload() + "-- -- -- -- -- -- -- >" + Thread.currentThread().getName());
channel.basicReject(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)), true); }}Copy the code
The second consumer now rejects all the messages, and the first consumer consumes all the messages.
That is the case with Work Queues.
3.3 the Publish/Subscrite
Looking at the publish-subscribe model, this is the case:
More than a producer, consumer, each consumer has its own a queue, the producers did not send the message directly to the queue, but sent to the switch, switch, each queue binding producers of messages sent through the switches, arrived in a queue, the realization of the aim of the multiple consumers access to a message. Note that if you send a message to an Exchange without a queue binding, the message will be lost. This is because exchanges cannot store messages in RabbitMQ, only queues can store messages, as shown in the following figure:
In this case, we have four switches to choose from:
- Direct
- Fanout
- Topic
- Header
Let me give you a simple example.
3.3.1 Direct
The DirectExchange routing policy binds message queues to a DirectExchange. When a message arrives at DirectExchange, it will be forwarded to the Queue with the same routing key as the message. For example, if the message queue name is Hello-queue, the message whose routingkey is Hello-queue will be received by the message queue. The DirectExchange configuration is as follows:
@Configuration
public class RabbitDirectConfig {
public final static String DIRECTNAME = "javaboy-direct";
@Bean
Queue queue(a) {
return new Queue("hello-queue");
}
@Bean
DirectExchange directExchange(a) {
return new DirectExchange(DIRECTNAME, true.false);
}
@Bean
Binding binding(a) {
return BindingBuilder.bind(queue())
.to(directExchange()).with("direct"); }}Copy the code
- First provide a message Queue, then create a DirectExchange object with three parameters: name, whether it will remain valid after restart, and whether it will be deleted if it is not used for a long time.
- Create a Binding object to bind the Exchange and Queue together.
- The configuration of DirectExchange and Binding beans can be omitted, that is, if DirectExchange is used, only one instance of Queue can be configured.
Take a look at consumers:
@Component
public class DirectReceiver {
@RabbitListener(queues = "hello-queue")
public void handler1(String msg) {
System.out.println("DirectReceiver:"+ msg); }}Copy the code
The @rabbitListener annotation specifies that a method is a message consuming method whose argument is the message received. Then inject a RabbitTemplate object into the unit test class to send the message as follows:
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void directTest(a) {
rabbitTemplate.convertAndSend("hello-queue"."hello direct!"); }}Copy the code
The final result is as follows:
3.3.2 rainfall distribution on 10-12 Fanout
FanoutExchange’s data exchange policy is to forward all incoming messages to all queues bound to FanoutExchange. In this policy, routingKey will have no effect. FanoutExchange is configured as follows:
@Configuration
public class RabbitFanoutConfig {
public final static String FANOUTNAME = "sang-fanout";
@Bean
FanoutExchange fanoutExchange(a) {
return new FanoutExchange(FANOUTNAME, true.false);
}
@Bean
Queue queueOne(a) {
return new Queue("queue-one");
}
@Bean
Queue queueTwo(a) {
return new Queue("queue-two");
}
@Bean
Binding bindingOne(a) {
return BindingBuilder.bind(queueOne()).to(fanoutExchange());
}
@Bean
Binding bindingTwo(a) {
returnBindingBuilder.bind(queueTwo()).to(fanoutExchange()); }}Copy the code
Create a FanoutExchange with the same meaning as creating DirectExchange, create two queues, and bind both to FanoutExchange. Next create two consumers as follows:
@Component
public class FanoutReceiver {
@RabbitListener(queues = "queue-one")
public void handler1(String message) {
System.out.println("FanoutReceiver:handler1:" + message);
}
@RabbitListener(queues = "queue-two")
public void handler2(String message) {
System.out.println("FanoutReceiver:handler2:"+ message); }}Copy the code
Two consumers consume messages from both message queues and then send messages in the unit test as follows:
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void fanoutTest(a) {
rabbitTemplate
.convertAndSend(RabbitFanoutConfig.FANOUTNAME,
null."hello fanout!"); }}Copy the code
Note that you don’t need a Routingkey to send a message, you can specify exchange, and a ROUTingkey can pass a NULL.
The final execution log is as follows:
3.3.3 Topic
TopicExchange is a complex but flexible routing policy. In TopicExchange, a Queue is bound to TopicExchange through a Routingkey. When a message arrives at TopicExchange, TopicExchange routes messages to one or more queues based on the message’s Routingkey. TopicExchange configuration is as follows:
@Configuration
public class RabbitTopicConfig {
public final static String TOPICNAME = "sang-topic";
@Bean
TopicExchange topicExchange(a) {
return new TopicExchange(TOPICNAME, true.false);
}
@Bean
Queue xiaomi(a) {
return new Queue("xiaomi");
}
@Bean
Queue huawei(a) {
return new Queue("huawei");
}
@Bean
Queue phone(a) {
return new Queue("phone");
}
@Bean
Binding xiaomiBinding(a) {
return BindingBuilder.bind(xiaomi()).to(topicExchange())
.with("xiaomi.#");
}
@Bean
Binding huaweiBinding(a) {
return BindingBuilder.bind(huawei()).to(topicExchange())
.with("huawei.#");
}
@Bean
Binding phoneBinding(a) {
return BindingBuilder.bind(phone()).to(topicExchange())
.with("#.phone.#"); }}Copy the code
- Create TopicExchange with the same parameters as before. The first Queue is used to store the messages related to “Xiaomi”, the second Queue is used to store the messages related to “Huawei”, and the third Queue is used to store the messages related to “phone”.
- Bind the three queues to TopicExchange respectively. “xiaomi.#” in the first Binding indicates that the routingkey of the message starts with “xiaomi”. Will be routed to Queue named “xiaomi”. “Huawei.#” in the second Binding indicates that all routingkeys that start with “huawei” will be routed to Queue named “Huawei”. The third Binding “#.phone.#” indicates that all messages containing phone in the routingkey will be routed to the Queue named phone.
Create three consumers for the three queues as follows:
@Component
public class TopicReceiver {
@RabbitListener(queues = "phone")
public void handler1(String message) {
System.out.println("PhoneReceiver:" + message);
}
@RabbitListener(queues = "xiaomi")
public void handler2(String message) {
System.out.println("XiaoMiReceiver:"+message);
}
@RabbitListener(queues = "huawei")
public void handler3(String message) {
System.out.println("HuaWeiReceiver:"+message); }}Copy the code
Then send the message in the unit test as follows:
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void topicTest(a) {
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
"xiaomi.news"."Xiaomi News...");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
"huawei.news"."Huawei News..");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
"xiaomi.phone"."Xiaomi phone...");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
"huawei.phone"."Huawei phones...");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
"phone.news"."Mobile news..."); }}Copy the code
According to RabbitTopicConfig, the first message will be routed to Queue named “Xiaomi”, the second message to Queue named “Huawei”, The third message will be routed to queues named “Xiaomi” and “phone”, the fourth message will be routed to queues named “Huawei” and “phone”, The last message will be routed to the Queue named phone.
We do the Header
HeadersExchange is a less-used routing policy that routes messages to different queues based on the Header of the message. This policy is also independent of the routingkey and is configured as follows:
@Configuration
public class RabbitHeaderConfig {
public final static String HEADERNAME = "javaboy-header";
@Bean
HeadersExchange headersExchange(a) {
return new HeadersExchange(HEADERNAME, true.false);
}
@Bean
Queue queueName(a) {
return new Queue("name-queue");
}
@Bean
Queue queueAge(a) {
return new Queue("age-queue");
}
@Bean
Binding bindingName(a) {
Map<String, Object> map = new HashMap<>();
map.put("name"."sang");
return BindingBuilder.bind(queueName())
.to(headersExchange()).whereAny(map).match();
}
@Bean
Binding bindingAge(a) {
return BindingBuilder.bind(queueAge())
.to(headersExchange()).where("age").exists(); }}Copy the code
In the first bindingName method, whereAny indicates that as long as one of the headers in the message matches the key/value in the map, You route the message to a queue named name-queue, where you can also use the whereAll method to indicate that all headers of the message match. WhereAny and whereAll actually correspond to a property called X-match. The configuration in bindingAge means that as long as the Header of the message contains age, the message will be routed to the queue named age-queue, regardless of the age value.
Next create two message consumers:
@Component
public class HeaderReceiver {
@RabbitListener(queues = "name-queue")
public void handler1(byte[] msg) {
System.out.println("HeaderReceiver:name:"
+ new String(msg, 0, msg.length));
}
@RabbitListener(queues = "age-queue")
public void handler2(byte[] msg) {
System.out.println("HeaderReceiver:age:"
+ new String(msg, 0, msg.length)); }}Copy the code
Note that the arguments are received in byte arrays. Then in the unit test we create the sending method for the message, which is also independent of the RoutingKey, as follows:
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void headerTest(a) {
Message nameMsg = MessageBuilder
.withBody("hello header! name-queue".getBytes())
.setHeader("name"."sang").build();
Message ageMsg = MessageBuilder
.withBody("hello header! age-queue".getBytes())
.setHeader("age"."99").build();
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg);
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg); }}Copy the code
Here we create two messages with different headers that will be sent to different queues.
The final execution result is as follows:
3.4 Routing
This is the case:
One producer, one switch, two queues, and two consumers. After creating an Exchange, the producer unbinds the corresponding queue according to the RoutingKey and specifies the specific RoutingKey for the message when sending it.
The diagram below:
The routing key is used to route messages. I will not give any examples here. You can refer to 3.3.1 summary.
3.5 switchable viewer
This is the case:
One producer, one switch, two queues, two consumers, the producer creates a Topic Exchange and binds it to the queue. This binding can be written using the * and # keywords to specify the RoutingKey content in the format xxx.xxx.xxx.
The diagram below:
I will not give you an example. I have already given you an example in section 3.3.3.
3.6 RPC
This form of sending and receiving RPC, songko just wrote an article two days ago and we will not talk about the portal:
- SpringBoot+RabbitMQ implement RPC call
3.7 Publisher Confirms
This kind of sending confirms that Songo has written about it before, the portal:
- Four strategies to ensure RabbitMQ message delivery reliability! Which one do you use?
- RabbitMQ high availability How to ensure successful message consumption
4. Summary
RabbitMQ can send and receive messages in seven different ways
Public number [jiangnan little rain] background reply rabbitmqdemo, get this case address ~