I. Mode description

Topic types, in contrast to Direct, can route messages to different queues based on a RoutingKey. Except that the Topic Exchange type can make queues use wildcards when binding routingKeys!

A Routingkey is typically made up of one or more words, with “between” words. Split, for example, item.insert

Wildcard rules:

  • Match one or more words
  • Match no more, no less exactly 1 word

For example:

Item.# : can match item.insert. ABC or item.insert item.* : can match only item.insert

Illustration:

  • Red Queue: the Queue is bound to USA.#, so any Queue with USA. The initial routing key will be matched
  • Yellow Queue: bind to #. News, so any routing key ending in. News will be matched

Second, the code

1) producers

Insert, item.update, and item.delete.

package com.itheima.rabbitmq.topic; 

import com.itheima.rabbitmq.util.ConnectionUtil; 
import com.rabbitmq.client.BuiltinExchangeType; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 

/** * Wildcard Topic The switch type is Topic */ 
public class Producer { 
    // Switch name
    static final String TOPIC_EXCHAGE = "topic_exchange"; 
    
    // Queue name
    static final String TOPIC_QUEUE_1 = "topic_queue_1"; 
    
    // Queue name
    static final String TOPIC_QUEUE_2 = "topic_queue_2"; 
    public static void main(String[] args) throws Exception { 
        // Create a connection
        Connection connection = ConnectionUtil.getConnection(); 
        // Create channel
        Channel channel = connection.createChannel(); 
        Parameter 1: switch name parameter 2: switch type, fanout, topic, topic, headers */ 
        channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC); 
        
        // Send a message
        String message = "New goods. Topic mode; Routing key is item.insert" ; 
        channel.basicPublish(TOPIC_EXCHAGE, "item.insert".null, message.getBytes()); 
        System.out.println("Message sent:" + message); 
        
        // Send a message
        message = "Modified the product. Topic mode; Routing keys for the item. The update" ; 
        channel.basicPublish(TOPIC_EXCHAGE, "item.update".null, message.getBytes()); 
        System.out.println("Message sent:" + message); 
        
        // Send a message
        message = "Deleted the item. Topic mode; Routing keys for the item. Delete" ; 
        channel.basicPublish(TOPIC_EXCHAGE, "item.delete".null, message.getBytes()); 
        System.out.println("Message sent:" + message); 
        
        // Close the resourcechannel.close(); connection.close(); }}Copy the code

② Consumer 1

There are two types of messages received: update goods and delete goods

package com.itheima.rabbitmq.topic; 

import com.itheima.rabbitmq.util.ConnectionUtil; 
import com.rabbitmq.client.*; 
import java.io.IOException; 

public class Consumer1 { 
    public static void main(String[] args) throws Exception { 
        Connection connection = ConnectionUtil.getConnection(); 
        
        // Create channel
        Channel channel = connection.createChannel();
         
        // Declare a switch
        channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC); 
        
        // Declare (create) a queue
        /** * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: automatically delete queue when not in use * parameter 5: other queue parameters */ 
        channel.queueDeclare(Producer.TOPIC_QUEUE_1, true.false.false.null); 
        
        // The queue binds the switch
        channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.update"); 
        channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.delete"); 
        
        // Create a consumer; And set up message handling
        DefaultConsumer consumer = new DefaultConsumer(channel){ 
            @Override 
            /** * consumerTag specifies the contents of the envelope message packet while channel.basicConsume, from which the message ID, message routingKey, and switch are obtained. Message and retransmission flags (whether a message needs to be resent if it fails to be received) * Properties property information * Body message */
            public void handleDelivery(String consumerTag, Envelope envelope, 
                    AMQP.BasicProperties properties, byte[] body) throws IOException { 
                / / routing key
                System.out.println("The route key is:" + envelope.getRoutingKey()); 
                
                / / switches
                System.out.println("Switch is:" + envelope.getExchange()); 
                
                / / message id
                System.out.println("Message ID is:" + envelope.getDeliveryTag()); 
                
                // Received message
                System.out.println("Consumer 1- received the message:" + new String(body, "utf-8")); }};// Listen for messages
        /** * Parameter 1: queue name * Parameter 2: automatic acknowledgement. If this parameter is set to true, the message is automatically sent to MQ and will be deleted when the message is received. If this parameter is set to false, manual confirmation is required 
        channel.basicConsume(Producer.TOPIC_QUEUE_1, true, consumer); }}Copy the code

③ Consumer 2

Receive all types of messages: new items, updated items, and deleted items.

package com.itheima.rabbitmq.topic; 

import com.itheima.rabbitmq.util.ConnectionUtil; 
import com.rabbitmq.client.*; 
import java.io.IOException; 

public class Consumer2 { 
    public static void main(String[] args) throws Exception { 
        Connection connection = ConnectionUtil.getConnection(); 
        
        // Create channel
        Channel channel = connection.createChannel(); 
        
        // Declare a switch
        channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC); 
        
        // Declare (create) a queue
        /** * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: automatically delete queue when not in use * parameter 5: other queue parameters */
        channel.queueDeclare(Producer.TOPIC_QUEUE_2, true.false.false.null); 
        
        // The queue binds the switch
        channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE, "item.*");
        
        // Create a consumer; And set up message handling
        DefaultConsumer consumer = new DefaultConsumer(channel){ 
            @Override 
            /** * consumerTag specifies the contents of the envelope message packet while channel.basicConsume, from which the message ID, message routingKey, and switch are obtained. Message and retransmission flags (whether a message needs to be resent if it fails to be received) * Properties property information * Body message */ 
            public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body) throws IOException { 
                    
                / / routing key
                System.out.println("The route key is:" + envelope.getRoutingKey()); 
                
                / / switches
                System.out.println("Switch is:" + envelope.getExchange()); 
                
                / / message id
                System.out.println("Message ID is:" + envelope.getDeliveryTag()); 
                
                // Received message
                System.out.println("The message received by Consumer 2 is:" + new String(body, "utf-8")); }};// Listen for messages
        /** * Parameter 1: queue name * Parameter 2: automatic acknowledgement. If this parameter is set to true, the message is automatically sent to MQ and will be deleted when the message is received. If this parameter is set to false, manual confirmation is required 
        channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer); }}Copy the code

Three, test,

Start all consumers, then use producers to send messages; The message sent by the producer corresponding to the routing key corresponding to the queue can be viewed on the console corresponding to the consumer. To reach the desired effect; And these routing keys can use wildcards.

After executing the test code, actually go to the Exchanges TAB in RabbitMQ administration and click on the topic_exchange switch to see the following bindings:

conclusion

Topic mode can realize Publish/Subscribe mode and Routing mode function; However, wildcard characters can be used in routing key configuration for Topic, which is more flexible.