This is the sixth day of my participation in Gwen Challenge

Our previous rabbitMQ model was switchless, sent directly to the queue, followed by the subscription model, sending messages to multiple consumers at once

A producer sends a message to the switch, the switch sends a message to the queue bound to it, and the consumer gets the message from the queue, X(Exchange) : The switch accepts messages sent by producers and, depending on how it is configured, knows how to process them, whether to send them to one queue, to all queues, or to discard them.

Switches fall into several categories

Publish/Subscribe: sends a message to all queues bound to the switch Routing: sends a message to a queue matching the specified Routing key Topic: wildcard: sends a message to a queue matching the Routing patternCopy the code

Subscription model –Publish/Subscribe

In broadcast mode, the message sending process looks like this:

  • 1) You can have multiple consumers
  • 2) Each consumer has its own queue
  • 3) Every queue should be bound to Exchange
  • 4) The message sent by the producer can only be sent to the switch. The switch decides which queue to send the message to, but the producer cannot decide.
  • 5) The switch sends messages to all bound queues
  • 6) All consumers in the queue can get the message. Implement a message to be consumed by multiple consumers

producers

The producer declares the switch, does not declare the queue, and the message is sent to the switch, rather than to the queue

public class p1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. Create connection factories
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2. Set parameters
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        3. Create a connection
        Connection connection = connectionFactory.newConnection();
        // Get the channel
        Channel channel = connection.createChannel();
       // Declare exchange and specify type fanout
        channel.exchangeDeclare("Subscribe_exchange"."fanout");
      // Message content
        String message = "Hello_Subscribe";
        // Publish messages to Exchange
        channel.basicPublish("Subscribe_exchange"."".null, message.getBytes());
        System.out.println("Producer sends message = : '" + message + "'"); channel.close(); connection.close(); }}Copy the code

Consumer 1

public class c1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. Create connection factories
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2. Set parameters
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        3. Create a connection
        Connection connection = connectionFactory.newConnection();
        // Get the channel
        Channel channel = connection.createChannel();
        // Declare a queue
        channel.queueDeclare("Subscribe_queue_1".false.false.false.null);

        // Bind queues to switches
        channel.queueBind("Subscribe_queue_1"."Subscribe_exchange"."");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("C1 consumption News:"+new String(body));
                / / ack manually
                channel.basicAck(envelope.getDeliveryTag(),false); }};// Listen on the queue and automatically return done
        channel.basicConsume("Subscribe_queue_1".false, consumer); }}Copy the code

Consumer 2

public class c2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. Create connection factories
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2. Set parameters
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        3. Create a connection
        Connection connection = connectionFactory.newConnection();
        // Get the channel
        Channel channel = connection.createChannel();
        // Declare a queue
        channel.queueDeclare("Subscribe_queue_2".false.false.false.null);

        // Bind queues to switches
        channel.queueBind("Subscribe_queue_2"."Subscribe_exchange"."");

        // Define the consumers of the queue
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("C2 consumer News:"+new String(body));
                / / ack manually
                channel.basicAck(envelope.getDeliveryTag(),false); }};// Listen on the queue and automatically return done
        channel.basicConsume("Subscribe_queue_2".false, consumer); }}Copy the code

Qidong consumer, producer sends a message and looks at the output

Routing- Selectively send the message

Subscription mode, where different queues receive different messages, the queue binding to the switch must be specified, and the routingKey to send the message must be specified when the message is sent

As shown in the figure above, the producer produces messages that are sent to the switch, which sends messages through a queue matching Rontingkley’s.

Producer – Sends different messages three times, matching different routingkeys

public class p {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1. Create connection factories
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2. Set parameters
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        3. Create a connection
        Connection connection = connectionFactory.newConnection();
        // Get the channel
        Channel channel = connection.createChannel();
        // Declare exchange and specify type fanout
        channel.exchangeDeclare("routing_exchange"."direct");
        // Message content
        //String message = "new ";
        //String message = "delete ";
        String message = "Update";
         // Publish messages to Exchange
        //channel.basicPublish("routing_exchange", "insert", null, message.getBytes());
        //channel.basicPublish("routing_exchange", "delect", null, message.getBytes());
        channel.basicPublish("routing_exchange"."update".null, message.getBytes());
        System.out.println("Producer sends message = : start '" + message + "'"); channel.close(); connection.close(); }}Copy the code

Consumers to insert

public class insert {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. Create connection factories
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2. Set parameters
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        3. Create a connection
        Connection connection = connectionFactory.newConnection();
        // Get the channel
        Channel channel = connection.createChannel();
        // Declare a queue
        channel.queueDeclare("routing_queue_insert".false.false.false.null);

        // Bind queues to switches
        channel.queueBind("routing_queue_insert"."routing_exchange"."insert");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Insert receives message:"+new String(body));
                / / ack manually
                channel.basicAck(envelope.getDeliveryTag(),false); }};// Listen on the queue and automatically return done
        channel.basicConsume("routing_queue_insert".false, consumer); }}Copy the code

Consumers to delect

public class delect {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. Create connection factories
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2. Set parameters
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        3. Create a connection
        Connection connection = connectionFactory.newConnection();
        // Get the channel
        Channel channel = connection.createChannel();
        // Declare a queue
        channel.queueDeclare("routing_queue_delect".false.false.false.null);

        // Bind queues to switches
        channel.queueBind("routing_queue_delect"."routing_exchange"."delect");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Delect receives message:"+new String(body));
                / / ack manually
                channel.basicAck(envelope.getDeliveryTag(),false); }};// Listen on the queue and automatically return done
        channel.basicConsume("routing_queue_delect".false, consumer); }}Copy the code

Consumers update

public class update {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. Create connection factories
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2. Set parameters
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        3. Create a connection
        Connection connection = connectionFactory.newConnection();
        // Get the channel
        Channel channel = connection.createChannel();
        // Declare a queue
        channel.queueDeclare("routing_queue_update".false.false.false.null);

        // Bind queues to switches
        channel.queueBind("routing_queue_update"."routing_exchange"."update");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Delect receives message:"+new String(body));
                / / ack manually
                channel.basicAck(envelope.getDeliveryTag(),false); }};// Listen on the queue and automatically return done
        channel.basicConsume("routing_queue_update".false, consumer); }}Copy the code

Looking at the console output, you can see that different routingkeys bound to different routingkeys receive different messages, and each queue can have many Routingkeys

topic–

Unlike Direct switches, topic matches can be made with wildcards

A Routingkey is typically made up of one or more words, with “between” words. segmentation

* (asterisk) is a good substitute for a word. # (hash) can replace zero or more words.Copy the code

producers

public class p {
        public static void main(String[] args) throws IOException, TimeoutException {

            //1. Create connection factories
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //2. Set parameters
            connectionFactory.setHost("192.168.145.3");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/zhaojin");
            connectionFactory.setUsername("zhaojin");
            connectionFactory.setPassword("zhaojin");
            3. Create a connection
            Connection connection = connectionFactory.newConnection();
            // Get the channel
            Channel channel = connection.createChannel();
            // Declare exchange and specify type fanout
            channel.exchangeDeclare("topic_exchange"."topic");
            // Message content
            String message = "New";
            //String message = "delete ";
            //String message = "update ";
            // Publish messages to Exchange
            channel.basicPublish("topic_exchange"."goods.insert".null, message.getBytes());
            //channel.basicPublish("topic_exchange", "goods.delect", null, message.getBytes());
           // channel.basicPublish("topic_exchange", "goods.update", null, message.getBytes());
            System.out.println("Producer sends message = : start '" + message + "'"); channel.close(); connection.close(); }}Copy the code

Consumer 1 only accepts inserts and delect

public class c1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. Create connection factories
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2. Set parameters
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        3. Create a connection
        Connection connection = connectionFactory.newConnection();
        // Get the channel
        Channel channel = connection.createChannel();
        // Declare a queue
        channel.queueDeclare("topic_queue_1".false.false.false.null);

        // Bind queues to switches
        channel.queueBind("topic_queue_1"."topic_exchange"."goods.insert");
        channel.queueBind("topic_queue_1"."topic_exchange"."goods.delect");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Add delete received message:"+new String(body));
                / / ack manually
                channel.basicAck(envelope.getDeliveryTag(),false); }};// Listen on the queue and automatically return done
        channel.basicConsume("topic_queue_1".false, consumer); }}Copy the code

Consumer 2 takes all goods as long as they match

public class c2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. Create connection factories
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2. Set parameters
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        3. Create a connection
        Connection connection = connectionFactory.newConnection();
        // Get the channel
        Channel channel = connection.createChannel();
        // Declare a queue
        channel.queueDeclare("topic_queue_2".false.false.false.null);

        // Bind queues to switches
        channel.queueBind("topic_queue_2"."topic_exchange"."goods.*");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Commodity receive message:"+new String(body));
                / / ack manually
                channel.basicAck(envelope.getDeliveryTag(),false); }};// Listen on the queue and automatically return done
        channel.basicConsume("topic_queue_2".false, consumer); }}Copy the code

Run send promotion three messages and see console output

Perfect,