I. Mode description
Publish and subscribe model:
- Each consumer listens to its own queue.
- The producer sends the message to the broker, and the switch forwards the message to each queue bound to the switch, which receives the message
Second, the code
1) producers
package com.itheima.rabbitmq.ps;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/** * The switch type used for publishing and subscribing is fanout */
public class Producer {
// Switch name
static final String FANOUT_EXCHAGE = "fanout_exchange";
// Queue name
static final String FANOUT_QUEUE_1 = "fanout_queue_1";
// Queue name
static final String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) throws Exception {
// Create a connection
Connection connection = ConnectionUtil.getConnection();
// Create channel
Channel channel = connection.createChannel();
Parameter 1: switch name * Parameter 2: switch type, fanout, topic, direct, headers */
channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
// Declare (create) a queue
/** * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: automatically delete queue when not in use * parameter 5: other queue parameters */
channel.queueDeclare(FANOUT_QUEUE_1, true.false.false.null);
channel.queueDeclare(FANOUT_QUEUE_2, true.false.false.null);
// The queue binds the switch
channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");
channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");
for (int i = 1; i <= 10; i++) {
// Send a message
String message = "Hello, Little rabbit! Publish and subscribe --" + i;
/** * Parameter 1: switch name, if not specified use the Default Default Exchage * parameter 2: route key, simple mode can pass queue name * parameter 3: other message properties * parameter 4: message content */
channel.basicPublish(FANOUT_EXCHAGE, "".null, message.getBytes());
System.out.println("Message sent:" + message);
}
// Close the resourcechannel.close(); connection.close(); }}Copy the code
② Consumer 1
package com.itheima.rabbitmq.ps;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// Create channel
Channel channel = connection.createChannel();
// Declare a switch
channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
// Declare (create) a queue
/** * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: automatically delete queue when not in use * parameter 5: other queue parameters */
channel.queueDeclare(Producer.FANOUT_QUEUE_1, true.false.false.null);
// The queue binds the switch
channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHAGE, "");
// Create a consumer; And set up message handling
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/** * consumerTag specifies the contents of the envelope message packet while channel.basicConsume, from which the message ID, message routingKey, and switch are obtained. Message and retransmission flags (whether a message needs to be resent if it fails to be received) * Properties property information * Body message */
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
/ / routing key
System.out.println("The route key is:" + envelope.getRoutingKey());
/ / switches
System.out.println("Switch is:" + envelope.getExchange());
/ / message id
System.out.println("Message ID is:" + envelope.getDeliveryTag());
// Received message
System.out.println("Consumer 1- received the message:" + new String(body, "utf-8")); }};// Listen for messages
/** * Parameter 1: queue name * Parameter 2: automatic acknowledgement. If this parameter is set to true, the message is automatically sent to MQ and will be deleted when the message is received. If this parameter is set to false, manual confirmation is required
channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer); }}Copy the code
③ Consumer 2
package com.itheima.rabbitmq.ps;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// Create channel
Channel channel = connection.createChannel();
// Declare a switch
channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
// Declare (create) a queue
/** * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: automatically delete queue when not in use * parameter 5: other queue parameters */
channel.queueDeclare(Producer.FANOUT_QUEUE_2, true.false.false.null);
// The queue binds the switch
channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHAGE, "");
// Create a consumer; And set up message handling
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/** * consumerTag specifies the contents of the envelope message packet while channel.basicConsume, from which the message ID, message routingKey, and switch are obtained. Message and retransmission flags (whether a message needs to be resent if it fails to be received) * Properties property information * Body message */
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
/ / routing key
System.out.println("The route key is:" + envelope.getRoutingKey());
/ / switches
System.out.println("Switch is:" + envelope.getExchange());
/ / message id
System.out.println("Message ID is:" + envelope.getDeliveryTag());
// Received message
System.out.println("The message received by Consumer 2 is:" + new String(body, "utf-8")); }};// Listen for messages
/** * Parameter 1: queue name * Parameter 2: automatic acknowledgement. If this parameter is set to true, the message is automatically sent to MQ and will be deleted when the message is received. If this parameter is set to false, manual confirmation is required
channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer); }}Copy the code
Three, test,
Start all consumers, then use producers to send messages. In the console corresponding to each consumer, you can view all the messages sent by the producer and the effect of the broadcast.
After executing the test code, actually go to the Exchanges TAB in the RabbitMQ administration background and click on the Fanout_exchange switch to see the following bindings:
conclusion
The switch needs to bind to queues so that a message can be received by multiple consumers.
The difference between the publish and subscribe mode and the work queue mode
- Work queue mode does not define switches, whereas publish/subscribe mode does.
- The producer of the publish/subscribe mode sends messages to the switch, and the producer of the work queue mode sends messages to the queue (the default switch is used underneath).
- In publish/subscribe mode, you need to bind the queue to the switch. In work queue mode, you do not need to bind the queue to the default switch.