The previous article: introduction to the RabbitMQ (2) – create a basic message queue The code shown in this paper has been uploaded to making javaWithoutSmoke/rabbitmq – demo
Getting started with RabbitMQ – Five modes and four switches for RabbitMQ
Six message patterns
In RabbitMQ, there are six modes of message propagation: the six modes described on the RabbitMQ website
- Simple Work Queue: Also known as point-to-point mode, a message is consumed by a single consumer. (When there are multiple consumers, a rotation mechanism is used by default to assign messages to consumers).
- Work Queues: Also known as fair Queues, a can-do message queue model. The queue must receive a manual ACK from the consumer before it can continue sending messages to the consumer.
- Publish/Subscribe: a message is consumed by multiple consumers.
- Routing: Selectively receives the message.
- Topics: Selectively receiving messages with rules
- RPC mode: Publishers publish messages and wait for results over RPC. At present, this should be less scene, and the code is more complex, this chapter will not be discussed in detail.
- Note: The official website has Publisher, Confirms news mechanism for Confirms. This refers to how producers send reliable messages.
The four exchanges for RabbitMQ
In understanding these message patterns, the concept of Exchange was introduced:
There is an explanation of this concept in the publish subscription section:
The core idea in the RabbitMQ messaging model is that the producer never sends any messages directly to the queue. In fact, producers often do not even know whether messages are delivered to any queue. Instead, the producer can only send messages to the switch. Communication is a very simple thing. On the one hand, it receives messages from producers and, on the other hand, pushes them to the queue. Exchanges must know exactly what to do with incoming messages. Should it be attached to a particular queue? Should it be attached to many queues? Or it should be discarded. Rules are defined by the switch type.
There are four types of Exchange:
- Direct: binds queues to switches. The routeKey of messages must be the same as the routeKey of queues.
- Fanout (fan switch) : Does not process the routeKey and forwards messages directly to all queues bound to it.
- Topic (topic switch) : Messages are forwarded to queues based on routeKey, where # matches one or more words (more broadly) and * matches one word, according to certain rules.
- Headers: Forwards a message according to the headers of a message, not according to the routeKey. The header is a Map, which means it can match not only strings but other types of data. Rules can be all key-value pair matching or single key-value pair matching.
In fact, here we can almost get a comparison of the message model with Exchange:
Message schema | switches |
---|---|
Simple Work Queue, Work Queues | Empty switches |
Publish/Subscribe | Fanout (Sector exchange) |
Routing (Routing mode) | Direct (Direct switch connection) |
Topics (Topic mode) | Topic (Topic switch) |
Simple Work Queue
Get started with RabbitMQ (2) – creating a basic message queue
Work Queue
We will add a new queue named work-queue to RabbitMQ
- producers
/** * the producer */
public class Producer {
private static final String QUEUE_NAME = "work-queue";
public static void main(String[] args) throws IOException, TimeoutException {
while (true) {
// system.out. println(" Please enter message: ");
Scanner scanner = new Scanner(System.in);
// create a connection
Connection connection = RabbitMQConnection.getConnection();
// create channel
Channel channel = connection.createChannel();
//3, send a message, using the Scanner input from the console as a message
NextLine () ends the current input with a carriage return, which receives a space
String message = scanner.nextLine();
/* Exchange: a default exchange or an anonymous Exchange routingKey is used as an empty string. The routingKey is the name of the queue. Message body */
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("Message sent:" + message);
// Remember to close the connection after sendingchannel.close(); connection.close(); }}}Copy the code
- The consumer work queue determines that there are multiple consumers. So we’re going to set consumer 1 to process messages for 1s, and consumer 2 to process messages for 3s
/** * consumer 1 */
public class Consumer1{
private static final String QUEUE_NAME = "work-queue";
public static void main(String[] args) throws IOException, TimeoutException {
// create a connection
Connection connection = RabbitMQConnection.getConnection();
// create channel
Channel channel = connection.createChannel();
// 3. The server sends only one message to the consumer at a time
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
// This method is used when a message is received. This is overridden to output the received message
/* Parameter description: consumerTag: consumer-associated label envelope: message packet data BasicProperties: message additional properties Body: message body, currently binary */
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
// Simulate a situation where processing requests takes a long time
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String messageBody = new String(body);
System.out.println("Consumer Consumption News:"+messageBody);
// Manually confirm,
// The first argument: the default unique flag for the message
// The second parameter: whether to batch. When this parameter is true, all messages with delivery_tag less than or equal to the incoming value can be confirmed at once
channel.basicAck(envelope.getDeliveryTag(), false); }};// 4, add a listener, change to manual ACK
channel.basicConsume(QUEUE_NAME,false, defaultConsumer); }}Copy the code
- Consumer 2
/** * consumer 2 */
public class Consumer2 {
private static final String QUEUE_NAME = "work-queue";
public static void main(String[] args) throws IOException, TimeoutException {
// create a connection
Connection connection = RabbitMQConnection.getConnection();
// create channel
Channel channel = connection.createChannel();
// 3. The server sends only one message to the consumer at a time
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
// This method is used when a message is received. This is overridden to output the received message
/* Parameter description: consumerTag: consumer-associated label envelope: message packet data BasicProperties: message additional properties Body: message body, currently binary */
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
// Simulate a situation where processing requests takes a long time
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String messageBody = new String(body);
System.out.println("Consumer Consumption News:"+messageBody);
// Manually confirm,
// The first argument: the default unique flag for the message
// The second parameter: whether to batch. When this parameter is true, all messages with delivery_tag less than or equal to the incoming value can be confirmed at once
channel.basicAck(envelope.getDeliveryTag(), false); }};// 4, add a listener, change to manual ACK
channel.basicConsume(QUEUE_NAME,false, defaultConsumer); }}Copy the code
Start the two consumers and then start the producer, and as you can see, consumer 1 is processing faster, so there are multiple messages for him to process.
Publish/Subscribe
We create two queues subscribe1 and Subscribe2,
Then bind Exchange to two queues, without which messages cannot be delivered to the queues
- producers
/** * the producer */
public class Producer {
private static final String EXCHANGE_NAME = "Publish-Subscribe";
public static void main(String[] args) throws IOException, TimeoutException {
for (int i = 1; i < 7; i++) {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// Bind the switch
channel.exchangeDeclare(EXCHANGE_NAME, "fanout".true);
channel.basicPublish(EXCHANGE_NAME, "".null, String.valueOf(i).getBytes());
System.out.println("Message sent:"+ i); channel.close(); connection.close(); }}}Copy the code
- Consumer 1
/** * consumer 1 */
public class Consumer1{
private static final String QUEUE_NAME = "subscribe1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String messageBody = new String(body);
System.out.println(Consumer 1 Consumer News:+messageBody); }}; channel.basicConsume(QUEUE_NAME,true, defaultConsumer); }}Copy the code
- Consumer 2
/** * consumer subscribe2 */
public class Consumer2 {
private static final String QUEUE_NAME = "subscribe2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String messageBody = new String(body);
System.out.println(Consumer 2 Consumer News:+messageBody); }}; channel.basicConsume(QUEUE_NAME,true, defaultConsumer); }}Copy the code
Start the consumer, then start the producer. You can see that both consumers each consume all of the producer’s messages.
Thoughts about publishing subscriptions
-
The publishing and subscription mode is actually very similar to when we usually use the wechat public account, the public account publishes articles, and all the fans who subscribe to the public account can receive the message sent by the public account. But we pay attention to the public account, the public account will not pass the data to us. Is it the same here?
Scenario 1: We now start the producer, finish producing the message, and then start the consumer. Discover that two consumers are still able to consume already produced messages.
When a producer publishes a message, it sends the message to the switch, and the switch sends the message to two queues for us, so even though my producer has stopped, I already have a message in my queue.
So once we start the consumer to listen to the queue, we can consume data normally.
Scenario 2: We first unbind consumer 2 from Exchange, and then start consumer 1 and producer.
After rebinding the Subscribe2 queue to the publish-subscribe relationship with the switch, we start consumer 2 and see that consumer 2 has no data to consume. At this time, we come back to the title. In fact, this scene is more consistent with the scene mentioned in the title, because here the fans see and receive the push, in fact, the consumer has completed the consumption. A new subscriber adds a new queue to the existing Exchagne, and new messages must be delivered before new users can receive them.
Routing (Routing mode)
RoutingConsumer1, RoutingConsumer3, RoutingConsumer3
- producers
/** * the producer */
public class Producer {
private static final String EXCHANGE_NAME = "Routing";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// Bind directly connected switches
channel.exchangeDeclare(EXCHANGE_NAME, "direct".true);
channel.basicPublish(EXCHANGE_NAME, "key1".null, String.valueOf("key1").getBytes());
channel.basicPublish(EXCHANGE_NAME, "key2".null, String.valueOf("key2").getBytes());
System.out.println("Message sent"); channel.close(); connection.close(); }}Copy the code
- Consumer 1 (the other two consumer codes are almost the same, just change the serial number)
public class RoutingConsumer1 {
private static final String QUEUE_NAME = "RoutingConsumer1";
private static final String EXCHANGE_NAME = "Routing";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key1");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String messageBody = new String(body);
System.out.println(Consumer 1 Consumer News:+messageBody); }}; channel.basicConsume(QUEUE_NAME,true, defaultConsumer); }}Copy the code
Start the three consumers first, and then start the producers. Find that consumers 1 and 2 both consume KEY1, and consumer 3 outputs key3
Conclusion: When Routing messages are transferred, Direct Exchange delivers the message to the matching Routing Key.
Thinking about direct connection switch
- If the Routing Key is only partially identical, will it be delivered? We simply ask the producer to post a Routing Key = Key
channel.basicPublish(EXCHANGE_NAME, "key".null, String.valueOf("key").getBytes()); Copy the code
It turned out that no consumers were spending. That is to say,Such partial matching does not exist in the case of direct connection.
Topics (Topic mode)
The topic pattern comes into play when we need to partially match the Routing Key. Topics mode, three symbols
symbol | role |
---|---|
. | Used to divide words |
* | Match a word |
# | Matches one or more words |
The result is as follows: Producer produces messages whose routeKey is java.without. Smoke
The serial number | routeKey | Whether the consumer |
---|---|---|
TopicConsumer1 | java.without.smoke | Y |
TopicConsumer2 | java.* | N |
TopicConsumer3 | java.without.* | Y |
TopicConsumer4 | java.# | Y |
The Header mode
The main thing is to set the header property in the Channel, and the matching method.
- producers
public static Map<String, Object> map = new HashMap<>();
map = new HashMap<>();
map.put("ID".1);
map.put("Name"."aaaa");
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(map).build();
channel.basicPublish(EXCHANGE_NAME, "java.without.smoke", props, String.valueOf("key1").getBytes());
Copy the code
- consumers
channel.queueDeclare(QUEUE_NAME, true.false.false.null);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(map).build();
channel.queueBind(QUEUE_NAME, Producer.EXCHANGE_NAME, "java.*", props.getHeaders());
Copy the code
conclusion
-
RabbitMQ has given us a wide range of message patterns, of which topic can be used in the same way as routing and publish-subscribe. Each model has its own characteristics. Topic mode can also achieve the effect of point-to-point mode when there is only one consumer in a queue.
scenario plan Messages are consistently placed on a queue without requiring multiple consumers to speed up consumption Use a point-to-point model Messages are fixed to a queue, requiring multiple consumers to speed up consumption Use work queue mode Messages are posted to multiple queues according to certain rules Topic schema -
RabbitMQ messages are sent in the producer -> switch -> queue -> consumer mode, but the point – to – point mode and the work queue mode can be understood as an anonymous switch delivery queue.
-
An Exchange switch, in fact, is a lot like the Nginx server we use for reverse proxy. Nginx forwards reply requests and Exchagne forwards messages.