This section describes the seven modes and their application scenarios

Simple mode (Hello World)

At its simplest, one producer for one consumer, RabbitMQ acts as A message broker, forwarding A’s messages to B

Application scenario: An email is sent to a message queue, and the mail service obtains the email from the queue and sends the email to the recipient

Work Queues

Allocating tasks among multiple consumers (competitive consumer mode), where one producer corresponds to multiple consumers, is generally suitable for performing resource-intensive tasks that cannot be handled by a single consumer and need to be handled by multiple consumers

Application scenario: The processing of an order takes 10 seconds. Multiple orders can be placed on the message queue at the same time, and then processed by multiple consumers at the same time. This is parallelism, not serial for a single consumer

Publish/Subscribe

By sending a message to many consumers at once, a message sent by a producer is picked up by multiple consumers, that is, the message is broadcast to all consumers.

Application scenario: Multiple caches and databases need to be notified after updating commodity inventory. The structure should be:

  • A FANout switch has two message queues: the cache message queue and the database message queue

  • One cached message queue corresponds to multiple cache consumers

  • A database message queue corresponds to multiple database consumers

Routing mode

Messages are received selectively (Routing key). Messages are sent to the switch and a Routing key is specified. Consumers need to specify a Routing key when binding queues to the switch and consume only messages with a specified Routing key

Application scenario: for example, if an iphone12 is added to the inventory, iphone12 promotion consumers specify routing key as iphone12. Only this promotion will receive the message, and other promotion activities do not care about and will not consume the message with this routing key

Topic pattern (Topics)

To receive messages based on Topics, match the routing key to a pattern that the queue needs to bind to, # matches one word or more words, * matches only one word.

Application scenario: As above, iPhone promotions can receive messages with the theme of iPhone, such as iphone12 and iphone13

Remote Procedure Call (RPC)

If we need to run a function on a remote computer and wait for the result, we can use RPC. Application scenario: Wait for the interface to return data, such as order payment

Confirm by Publisher, Confirm by Confirms

Reliable publishing confirmation with publishers that it is a RabbitMQ extension allows reliable publishing. With publisher validation enabled on the channel, RabbitMQ will asynchronously acknowledge the messages posted by the sender, which means they have been processed on the server side. (Search the official account of Java bosom friend, reply “2021”, send you a Java interview questions treasure book)

Application scenario: Messages have high reliability requirements, such as wallet deduction

Code demo

There is no code for the latter two modes of demonstration, interested in their own research

A simple model

`import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Sender { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // Declare queue // Queue: queue name // durable: Persistent // exclusive: exclusive that is, only the channel is allowed to access this queue. If the value is true, it is used in scenarios where only one consumer can consume in a queue. // autoDelete: // Arguments: Other attributes channel.queueDECLARE (QUEUE_NAME, false, false, false, null); String message = "CSS message"; https://hunter.css. channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("[x]Sent '" + message + "'"); // Finally close and connect channel.close(); connection.close(); } } ` `import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver { private final static String QUEUE_NAME = "simplest_queue"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {// Get the connection ConnectionFactory Factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8");  System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }} `Copy the code

Work queue mode

`import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver1 { private final static String QUEUE_NAME = "queue_work"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // The server sends only one message at a time to the consumer channel.basicqos (1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8");  System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } ` `import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver2 { private final static String QUEUE_NAME = "queue_work"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // The server sends only one message at a time to the consumer channel.basicqos (1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8");  System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } ` `import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Sender { private final static String QUEUE_NAME = "queue_work"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // Declare queue channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 100; i++) { String message = "work mode message" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("[x] Sent '" + message + "'"); Thread.sleep(i * 10); } channel.close(); connection.close(); }} `Copy the code

Publish and subscribe model

`import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Receive1 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; BasicConsume (queueName, true, deliverCallback, consumerTag -> {}); } } ` `import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Receive2 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received2 '" + message + "'"); }; BasicConsume (queueName, true, deliverCallback, consumerTag -> {}); } } ` `import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Sender { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = "publish subscribe message"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }} `Copy the code

Routing patterns

`import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver1 { private final static String QUEUE_NAME = "queue_routing"; private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); QueueBind (QUEUE_NAME, EXCHANGE_NAME, "key"); // Specify the route key, receive key, and key2 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8");  System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } ` `import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver2 { private final static String QUEUE_NAME = "queue_routing2"; private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); QueueBind (QUEUE_NAME, EXCHANGE_NAME, "key2"); // Only key2 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8");  System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } ` `import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Sender { private final static String EXCHANGE_NAME = "exchange_direct"; private final static String EXCHANGE_TYPE = "direct"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // The switch declares channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); // Only the same routingKey will consume String message = "routing mode message"; channel.basicPublish(EXCHANGE_NAME, "key2", null, message.getBytes()); System.out.println("[x] Sent '" + message + "'"); // channel.basicPublish(EXCHANGE_NAME, "key", null, message.getBytes()); // System.out.println("[x] Sent '" + message + "'"); channel.close(); connection.close(); }} `Copy the code

The theme mode

`import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver1 { private final static String QUEUE_NAME = "queue_topic"; private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); 1 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8");  System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } ` `import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver2 { private final static String QUEUE_NAME = "queue_topic2"; private final static String EXCHANGE_NAME = "exchange_topic"; private final static String EXCHANGE_TYPE = "topic"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); 1 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*"); QueueBind (QUEUE_NAME, EXCHANGE_NAME, "*.#"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8");  System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } ` `import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Sender { private final static String EXCHANGE_NAME = "exchange_topic"; private final static String EXCHANGE_TYPE = "topic"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); String message = "topics model message with key.1"; channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes()); System.out.println("[x] Sent '" + message + "'"); String message2 = "Topics model message with key.1.2"; Channel. BasicPublish (EXCHANGE_NAME, "is the key. 1.2", null, message2. GetBytes ()); System.out.println("[x] Sent '" + message2 + "'"); channel.close(); connection.close(); }} `Copy the code

This section describes the four types of switches

  • A Direct exchange switch is a switch that has the routing function. A routing_key must be specified when bound to the switch, and a routing_key must be specified when the switch sends messages to the corresponding queue

  • Fanout Exchange: Broadcast messages to all queues, no processing, fastest

  • Topic Exchange: Add pattern matching to a directly connected switch, that is, pattern matching for a routing_key with * for one word and # for multiple words

  • Headers Exchange: Ignores the routing_key and uses Headers information (a Hash data structure) for matching. The advantage is that there are more flexible matching rules

conclusion

There are application scenarios in each of these queue modes, and you can choose from the application scenario examples