Please state the source of the article. Welcome to add Echo wechat (wechat id: T2421499075) for exchange and learning.


In the last two articles, we used the Direct Exchange and Fanout Exchange types to push messages, but we found that these two types are not suitable in some cases, such as: We have a hundred different queues subscribes to one of our producers in MQ, but at one time, our producers only want some of them to receive messages, so our Fanout Exchange directly broadcasts, which would be a waste of information resources. If we choose Direct Exchange, We’re going to have to keep setting up the dozens of queues that we’re sending to, which is a lot of rework and error prone. But if we were able to blur matches based on our queue RoutingKey, we’d have solved our problem perfectly. This article explains this pattern

Use Topic Exchange to push a message based on a rule

  • The preparatory work

Since we are pushing to a queue according to the rule, we will first set up a queue, specify the router type as Topic Exchange, and then define the routing key using the rule. We’ve got three queues here

And bind to a Topic Exchange using routing keys

Routing key parsing

The naming of routing key is the most critical part of our Topic Exchange type. When pushing messages, if exchanges of Topic type are used, queues can be directly not specified, so routing key becomes the key for us to push messages at this time. In order for the message to be better received, we must define the routing kye name in this place. In our case, we looked at three routing keys

  • Com. echo.* : Routing key is com.echo for message push. Queue queue_topic1 must be able to receive messages with any characters
  • Level2: The queue corresponding to queue queue_topic2 receives the message only when the routing key is com.echo.level2
  • #: When the routing key is any value, queue queue_topic2 can receive the message

Where * matches any word and # matches zero or more words.

Code case

package com.example.demo;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/ * * *@author echo
 * @dateThe 2021-01-14 * / desire
public class TopicProductTest {

    private static final String EXCHANGE_NAME = "exchange_topic";
    private static final String ROUTING_KEY = "com.echo.level2";
    private static final String IP_ADDRESS = "192.168.230.131";
    private static final int PORT = 5672;


    public static void main(String[] args) throws IOException, TimeoutException {

        // Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        // Set the link parameters for RabbitMQ
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("echo");
        factory.setPassword("123456");
        // Create a link to RabbitMQ
        Connection connection = factory.newConnection();
        // Create a channel
        Channel channel = connection.createChannel();
        // Create a persistent, non-auto-deleted switch with type="direct"
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true.false.null);
        // Send a persistent message: Topic hello world!
        String message = "topic hello world !";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        // Close the resourcechannel.close(); connection.close(); }}Copy the code

According to the function of the routing key above, the message pushed by this instance should be sent to the corresponding three queues. We ran it to see the result

The end result matches the description of our scope

  • We can also use consumers to verify this

Consumer code

package com.example.demo;

import com.rabbitmq.client.*;
import lombok.SneakyThrows;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/ * * *@author tang.sl
 * @dateThe 2021-01-14 sets * /
public class TopicConsumerTest {

    private static final String EXCHANGE_NAME = "exchange_topic";
    private static final String QUEUE_NAME = "queue_topic1";
    private static final String IP_ADDRESS = "192.168.230.131";
    private static final int PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        // Set the link parameters for RabbitMQ
        factory.setUsername("echo");
        factory.setPassword("123456");
        factory.setPort(PORT);
        factory.setHost(IP_ADDRESS);

        // Create a link to RabbitMQ
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // Declare switch Fanout mode
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true.false.null);
        // Bind to specify which queue to consume
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "".null);
        Consumer consumer = new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("recv message: " + new String(body));
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false); }}; channel.basicConsume(QUEUE_NAME, consumer);// Wait for the callback function to complete and close the resource
        TimeUnit.SECONDS.sleep(5); channel.close(); connection.close(); }}Copy the code

The results are consistent after running

conclusion

Topic can send messages to a specified sequence or set of queues by matching the * and # wildcards in the routing key