Please state the source of the article. Welcome to add Echo wechat (wechat id: T2421499075) for exchange and learning.
In the last two articles, we used the Direct Exchange and Fanout Exchange types to push messages, but we found that these two types are not suitable in some cases, such as: We have a hundred different queues subscribes to one of our producers in MQ, but at one time, our producers only want some of them to receive messages, so our Fanout Exchange directly broadcasts, which would be a waste of information resources. If we choose Direct Exchange, We’re going to have to keep setting up the dozens of queues that we’re sending to, which is a lot of rework and error prone. But if we were able to blur matches based on our queue RoutingKey, we’d have solved our problem perfectly. This article explains this pattern
Use Topic Exchange to push a message based on a rule
- The preparatory work
Since we are pushing to a queue according to the rule, we will first set up a queue, specify the router type as Topic Exchange, and then define the routing key using the rule. We’ve got three queues here
And bind to a Topic Exchange using routing keys
Routing key parsing
The naming of routing key is the most critical part of our Topic Exchange type. When pushing messages, if exchanges of Topic type are used, queues can be directly not specified, so routing key becomes the key for us to push messages at this time. In order for the message to be better received, we must define the routing kye name in this place. In our case, we looked at three routing keys
- Com. echo.* : Routing key is com.echo for message push. Queue queue_topic1 must be able to receive messages with any characters
- Level2: The queue corresponding to queue queue_topic2 receives the message only when the routing key is com.echo.level2
#
: When the routing key is any value, queue queue_topic2 can receive the message
Where * matches any word and # matches zero or more words.
Code case
package com.example.demo;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/ * * *@author echo
* @dateThe 2021-01-14 * / desire
public class TopicProductTest {
private static final String EXCHANGE_NAME = "exchange_topic";
private static final String ROUTING_KEY = "com.echo.level2";
private static final String IP_ADDRESS = "192.168.230.131";
private static final int PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException {
// Create a connection factory
ConnectionFactory factory = new ConnectionFactory();
// Set the link parameters for RabbitMQ
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername("echo");
factory.setPassword("123456");
// Create a link to RabbitMQ
Connection connection = factory.newConnection();
// Create a channel
Channel channel = connection.createChannel();
// Create a persistent, non-auto-deleted switch with type="direct"
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true.false.null);
// Send a persistent message: Topic hello world!
String message = "topic hello world !";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
// Close the resourcechannel.close(); connection.close(); }}Copy the code
According to the function of the routing key above, the message pushed by this instance should be sent to the corresponding three queues. We ran it to see the result
The end result matches the description of our scope
- We can also use consumers to verify this
Consumer code
package com.example.demo;
import com.rabbitmq.client.*;
import lombok.SneakyThrows;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/ * * *@author tang.sl
* @dateThe 2021-01-14 sets * /
public class TopicConsumerTest {
private static final String EXCHANGE_NAME = "exchange_topic";
private static final String QUEUE_NAME = "queue_topic1";
private static final String IP_ADDRESS = "192.168.230.131";
private static final int PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
// Set the link parameters for RabbitMQ
factory.setUsername("echo");
factory.setPassword("123456");
factory.setPort(PORT);
factory.setHost(IP_ADDRESS);
// Create a link to RabbitMQ
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// Declare switch Fanout mode
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true.false.null);
// Bind to specify which queue to consume
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "".null);
Consumer consumer = new DefaultConsumer(channel) {
@SneakyThrows
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("recv message: " + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false); }}; channel.basicConsume(QUEUE_NAME, consumer);// Wait for the callback function to complete and close the resource
TimeUnit.SECONDS.sleep(5); channel.close(); connection.close(); }}Copy the code
The results are consistent after running
conclusion
Topic can send messages to a specified sequence or set of queues by matching the * and # wildcards in the routing key