I. Mode description
Topic types, in contrast to Direct, can route messages to different queues based on a RoutingKey. Except that the Topic Exchange type can make queues use wildcards when binding routingKeys!
A Routingkey is typically made up of one or more words, with “between” words. Split, for example, item.insert
Wildcard rules:
- Match one or more words
- Match no more, no less exactly 1 word
For example:
Item.# : can match item.insert. ABC or item.insert item.* : can match only item.insert
Illustration:
- Red Queue: the Queue is bound to USA.#, so any Queue with USA. The initial routing key will be matched
- Yellow Queue: bind to #. News, so any routing key ending in. News will be matched
Second, the code
1) producers
Insert, item.update, and item.delete.
package com.itheima.rabbitmq.topic;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/** * Wildcard Topic The switch type is Topic */
public class Producer {
// Switch name
static final String TOPIC_EXCHAGE = "topic_exchange";
// Queue name
static final String TOPIC_QUEUE_1 = "topic_queue_1";
// Queue name
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) throws Exception {
// Create a connection
Connection connection = ConnectionUtil.getConnection();
// Create channel
Channel channel = connection.createChannel();
Parameter 1: switch name parameter 2: switch type, fanout, topic, topic, headers */
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
// Send a message
String message = "New goods. Topic mode; Routing key is item.insert" ;
channel.basicPublish(TOPIC_EXCHAGE, "item.insert".null, message.getBytes());
System.out.println("Message sent:" + message);
// Send a message
message = "Modified the product. Topic mode; Routing keys for the item. The update" ;
channel.basicPublish(TOPIC_EXCHAGE, "item.update".null, message.getBytes());
System.out.println("Message sent:" + message);
// Send a message
message = "Deleted the item. Topic mode; Routing keys for the item. Delete" ;
channel.basicPublish(TOPIC_EXCHAGE, "item.delete".null, message.getBytes());
System.out.println("Message sent:" + message);
// Close the resourcechannel.close(); connection.close(); }}Copy the code
② Consumer 1
There are two types of messages received: update goods and delete goods
package com.itheima.rabbitmq.topic;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// Create channel
Channel channel = connection.createChannel();
// Declare a switch
channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
// Declare (create) a queue
/** * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: automatically delete queue when not in use * parameter 5: other queue parameters */
channel.queueDeclare(Producer.TOPIC_QUEUE_1, true.false.false.null);
// The queue binds the switch
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.update");
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.delete");
// Create a consumer; And set up message handling
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/** * consumerTag specifies the contents of the envelope message packet while channel.basicConsume, from which the message ID, message routingKey, and switch are obtained. Message and retransmission flags (whether a message needs to be resent if it fails to be received) * Properties property information * Body message */
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
/ / routing key
System.out.println("The route key is:" + envelope.getRoutingKey());
/ / switches
System.out.println("Switch is:" + envelope.getExchange());
/ / message id
System.out.println("Message ID is:" + envelope.getDeliveryTag());
// Received message
System.out.println("Consumer 1- received the message:" + new String(body, "utf-8")); }};// Listen for messages
/** * Parameter 1: queue name * Parameter 2: automatic acknowledgement. If this parameter is set to true, the message is automatically sent to MQ and will be deleted when the message is received. If this parameter is set to false, manual confirmation is required
channel.basicConsume(Producer.TOPIC_QUEUE_1, true, consumer); }}Copy the code
③ Consumer 2
Receive all types of messages: new items, updated items, and deleted items.
package com.itheima.rabbitmq.topic;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// Create channel
Channel channel = connection.createChannel();
// Declare a switch
channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
// Declare (create) a queue
/** * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: automatically delete queue when not in use * parameter 5: other queue parameters */
channel.queueDeclare(Producer.TOPIC_QUEUE_2, true.false.false.null);
// The queue binds the switch
channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE, "item.*");
// Create a consumer; And set up message handling
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/** * consumerTag specifies the contents of the envelope message packet while channel.basicConsume, from which the message ID, message routingKey, and switch are obtained. Message and retransmission flags (whether a message needs to be resent if it fails to be received) * Properties property information * Body message */
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
/ / routing key
System.out.println("The route key is:" + envelope.getRoutingKey());
/ / switches
System.out.println("Switch is:" + envelope.getExchange());
/ / message id
System.out.println("Message ID is:" + envelope.getDeliveryTag());
// Received message
System.out.println("The message received by Consumer 2 is:" + new String(body, "utf-8")); }};// Listen for messages
/** * Parameter 1: queue name * Parameter 2: automatic acknowledgement. If this parameter is set to true, the message is automatically sent to MQ and will be deleted when the message is received. If this parameter is set to false, manual confirmation is required
channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer); }}Copy the code
Three, test,
Start all consumers, then use producers to send messages; The message sent by the producer corresponding to the routing key corresponding to the queue can be viewed on the console corresponding to the consumer. To reach the desired effect; And these routing keys can use wildcards.
After executing the test code, actually go to the Exchanges TAB in RabbitMQ administration and click on the topic_exchange switch to see the following bindings:
conclusion
Topic mode can realize Publish/Subscribe mode and Routing mode function; However, wildcard characters can be used in routing key configuration for Topic, which is more flexible.