This is the sixth day of my participation in Gwen Challenge
Our previous rabbitMQ model was switchless, sent directly to the queue, followed by the subscription model, sending messages to multiple consumers at once
A producer sends a message to the switch, the switch sends a message to the queue bound to it, and the consumer gets the message from the queue, X(Exchange) : The switch accepts messages sent by producers and, depending on how it is configured, knows how to process them, whether to send them to one queue, to all queues, or to discard them.
Switches fall into several categories
Publish/Subscribe: sends a message to all queues bound to the switch Routing: sends a message to a queue matching the specified Routing key Topic: wildcard: sends a message to a queue matching the Routing patternCopy the code
Subscription model –Publish/Subscribe
In broadcast mode, the message sending process looks like this:
- 1) You can have multiple consumers
- 2) Each consumer has its own queue
- 3) Every queue should be bound to Exchange
- 4) The message sent by the producer can only be sent to the switch. The switch decides which queue to send the message to, but the producer cannot decide.
- 5) The switch sends messages to all bound queues
- 6) All consumers in the queue can get the message. Implement a message to be consumed by multiple consumers
producers
The producer declares the switch, does not declare the queue, and the message is sent to the switch, rather than to the queue
public class p1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. Create connection factories
ConnectionFactory connectionFactory = new ConnectionFactory();
//2. Set parameters
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
3. Create a connection
Connection connection = connectionFactory.newConnection();
// Get the channel
Channel channel = connection.createChannel();
// Declare exchange and specify type fanout
channel.exchangeDeclare("Subscribe_exchange"."fanout");
// Message content
String message = "Hello_Subscribe";
// Publish messages to Exchange
channel.basicPublish("Subscribe_exchange"."".null, message.getBytes());
System.out.println("Producer sends message = : '" + message + "'"); channel.close(); connection.close(); }}Copy the code
Consumer 1
public class c1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. Create connection factories
ConnectionFactory connectionFactory = new ConnectionFactory();
//2. Set parameters
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
3. Create a connection
Connection connection = connectionFactory.newConnection();
// Get the channel
Channel channel = connection.createChannel();
// Declare a queue
channel.queueDeclare("Subscribe_queue_1".false.false.false.null);
// Bind queues to switches
channel.queueBind("Subscribe_queue_1"."Subscribe_exchange"."");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("C1 consumption News:"+new String(body));
/ / ack manually
channel.basicAck(envelope.getDeliveryTag(),false); }};// Listen on the queue and automatically return done
channel.basicConsume("Subscribe_queue_1".false, consumer); }}Copy the code
Consumer 2
public class c2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. Create connection factories
ConnectionFactory connectionFactory = new ConnectionFactory();
//2. Set parameters
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
3. Create a connection
Connection connection = connectionFactory.newConnection();
// Get the channel
Channel channel = connection.createChannel();
// Declare a queue
channel.queueDeclare("Subscribe_queue_2".false.false.false.null);
// Bind queues to switches
channel.queueBind("Subscribe_queue_2"."Subscribe_exchange"."");
// Define the consumers of the queue
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("C2 consumer News:"+new String(body));
/ / ack manually
channel.basicAck(envelope.getDeliveryTag(),false); }};// Listen on the queue and automatically return done
channel.basicConsume("Subscribe_queue_2".false, consumer); }}Copy the code
Qidong consumer, producer sends a message and looks at the output
Routing- Selectively send the message
Subscription mode, where different queues receive different messages, the queue binding to the switch must be specified, and the routingKey to send the message must be specified when the message is sent
As shown in the figure above, the producer produces messages that are sent to the switch, which sends messages through a queue matching Rontingkley’s.
Producer – Sends different messages three times, matching different routingkeys
public class p {
public static void main(String[] args) throws IOException, TimeoutException {
//1. Create connection factories
ConnectionFactory connectionFactory = new ConnectionFactory();
//2. Set parameters
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
3. Create a connection
Connection connection = connectionFactory.newConnection();
// Get the channel
Channel channel = connection.createChannel();
// Declare exchange and specify type fanout
channel.exchangeDeclare("routing_exchange"."direct");
// Message content
//String message = "new ";
//String message = "delete ";
String message = "Update";
// Publish messages to Exchange
//channel.basicPublish("routing_exchange", "insert", null, message.getBytes());
//channel.basicPublish("routing_exchange", "delect", null, message.getBytes());
channel.basicPublish("routing_exchange"."update".null, message.getBytes());
System.out.println("Producer sends message = : start '" + message + "'"); channel.close(); connection.close(); }}Copy the code
Consumers to insert
public class insert {
public static void main(String[] args) throws IOException, TimeoutException {
//1. Create connection factories
ConnectionFactory connectionFactory = new ConnectionFactory();
//2. Set parameters
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
3. Create a connection
Connection connection = connectionFactory.newConnection();
// Get the channel
Channel channel = connection.createChannel();
// Declare a queue
channel.queueDeclare("routing_queue_insert".false.false.false.null);
// Bind queues to switches
channel.queueBind("routing_queue_insert"."routing_exchange"."insert");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Insert receives message:"+new String(body));
/ / ack manually
channel.basicAck(envelope.getDeliveryTag(),false); }};// Listen on the queue and automatically return done
channel.basicConsume("routing_queue_insert".false, consumer); }}Copy the code
Consumers to delect
public class delect {
public static void main(String[] args) throws IOException, TimeoutException {
//1. Create connection factories
ConnectionFactory connectionFactory = new ConnectionFactory();
//2. Set parameters
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
3. Create a connection
Connection connection = connectionFactory.newConnection();
// Get the channel
Channel channel = connection.createChannel();
// Declare a queue
channel.queueDeclare("routing_queue_delect".false.false.false.null);
// Bind queues to switches
channel.queueBind("routing_queue_delect"."routing_exchange"."delect");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Delect receives message:"+new String(body));
/ / ack manually
channel.basicAck(envelope.getDeliveryTag(),false); }};// Listen on the queue and automatically return done
channel.basicConsume("routing_queue_delect".false, consumer); }}Copy the code
Consumers update
public class update {
public static void main(String[] args) throws IOException, TimeoutException {
//1. Create connection factories
ConnectionFactory connectionFactory = new ConnectionFactory();
//2. Set parameters
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
3. Create a connection
Connection connection = connectionFactory.newConnection();
// Get the channel
Channel channel = connection.createChannel();
// Declare a queue
channel.queueDeclare("routing_queue_update".false.false.false.null);
// Bind queues to switches
channel.queueBind("routing_queue_update"."routing_exchange"."update");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Delect receives message:"+new String(body));
/ / ack manually
channel.basicAck(envelope.getDeliveryTag(),false); }};// Listen on the queue and automatically return done
channel.basicConsume("routing_queue_update".false, consumer); }}Copy the code
Looking at the console output, you can see that different routingkeys bound to different routingkeys receive different messages, and each queue can have many Routingkeys
topic–
Unlike Direct switches, topic matches can be made with wildcards
A Routingkey is typically made up of one or more words, with “between” words. segmentation
* (asterisk) is a good substitute for a word. # (hash) can replace zero or more words.Copy the code
producers
public class p {
public static void main(String[] args) throws IOException, TimeoutException {
//1. Create connection factories
ConnectionFactory connectionFactory = new ConnectionFactory();
//2. Set parameters
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
3. Create a connection
Connection connection = connectionFactory.newConnection();
// Get the channel
Channel channel = connection.createChannel();
// Declare exchange and specify type fanout
channel.exchangeDeclare("topic_exchange"."topic");
// Message content
String message = "New";
//String message = "delete ";
//String message = "update ";
// Publish messages to Exchange
channel.basicPublish("topic_exchange"."goods.insert".null, message.getBytes());
//channel.basicPublish("topic_exchange", "goods.delect", null, message.getBytes());
// channel.basicPublish("topic_exchange", "goods.update", null, message.getBytes());
System.out.println("Producer sends message = : start '" + message + "'"); channel.close(); connection.close(); }}Copy the code
Consumer 1 only accepts inserts and delect
public class c1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. Create connection factories
ConnectionFactory connectionFactory = new ConnectionFactory();
//2. Set parameters
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
3. Create a connection
Connection connection = connectionFactory.newConnection();
// Get the channel
Channel channel = connection.createChannel();
// Declare a queue
channel.queueDeclare("topic_queue_1".false.false.false.null);
// Bind queues to switches
channel.queueBind("topic_queue_1"."topic_exchange"."goods.insert");
channel.queueBind("topic_queue_1"."topic_exchange"."goods.delect");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Add delete received message:"+new String(body));
/ / ack manually
channel.basicAck(envelope.getDeliveryTag(),false); }};// Listen on the queue and automatically return done
channel.basicConsume("topic_queue_1".false, consumer); }}Copy the code
Consumer 2 takes all goods as long as they match
public class c2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. Create connection factories
ConnectionFactory connectionFactory = new ConnectionFactory();
//2. Set parameters
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
3. Create a connection
Connection connection = connectionFactory.newConnection();
// Get the channel
Channel channel = connection.createChannel();
// Declare a queue
channel.queueDeclare("topic_queue_2".false.false.false.null);
// Bind queues to switches
channel.queueBind("topic_queue_2"."topic_exchange"."goods.*");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Commodity receive message:"+new String(body));
/ / ack manually
channel.basicAck(envelope.getDeliveryTag(),false); }};// Listen on the queue and automatically return done
channel.basicConsume("topic_queue_2".false, consumer); }}Copy the code
Run send promotion three messages and see console output
Perfect,