Message Queuing (RabbitMQ) Primer

prologue

Everybody is good! I am old ape, a new generation of migrant workers in deep struggle. I have been here for a year, and this period of time can be said to be a period of fastest growth, no matter in life or study. But shenzhen ghost weather, really have to let me ridicule (rain every day), this, with heavy rain wrote my first article nuggets. In fact, COMPARED with the four seasons like summer in the south, I prefer the four distinct seasons in the north. Don’t say much!

What is a message queue

Message queues (MQ, Message Queue) are a method of communication between applications.

Application scenarios of message queues

1. Asynchronous processing

Some unnecessary business logic is written to the message queue to run asynchronously and reduce the system response time

2. Service decoupling

In the traditional pattern, there are calls between systems, and code changes need to be made to the code of both systems simultaneously, while message queues can write the called message to the message queue, requiring the subscription of the calling system, without modifying the code of the called party.

3, value peak cutting

Peak-cutting, or peak traffic, is mostly used in the second kill of an e-commerce system or when there is a sudden surge in traffic. Message queues are used to store messages, and the system slowly consumes messages from the queue at its own consumption capacity until the backlog is exhausted.

3. First to know RabbitMQ

1, the introduction of

Broker: An application that receives and distributes messages.

Virtual Host: Designed for multi-tenancy and security reasons, the basic components of the AMQP are divided into Virtual groups, similar to the namespace concept in the network. When multiple users use the same RabbitMQ server, multiple vhosts can be created and each user creates an exchange or queue on their own Vhost

Connection: TCP Connection between provider/consumer and broker

Channel: If a Connection is established for each RabbitMQ access, it is expensive and inefficient to establish TCP connections when messages are heavy. A Channel is a logical connection established within a connection. If the application supports multiple threads, each thread usually creates a separate Channel for communication. The AMQP Method contains a channel ID to help the client and Message Broker identify channels, so they are completely isolated from each other. As a lightweight Connection, Channel greatly reduces the overhead of establishing TCP Connection in the operating system

Exchange: switch. Message reaches the first destination of the broker and is dispatched to the queue by matching the routing key in the query table according to the distribution rule. Common types include: Direct (point-to-point), topic (publish-subscribe), fanout (multicast broadcast)

Queue: This is where the message is finally sent to be picked up by the consumer

Binding: Virtual link between exchanges and queues. Binding can contain routing keys. Binding information is stored in the query table in exchange and used as a basis for message distribution

2. Five modes of RabbitMQ

2.1. Simple Mode (Getting started)

The simple mode, which is the RabbitMQ primer case, is straightforward: one for supply and one for consumption.

2.1.1, rely on
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> The < version > 5.9.0 < / version > < / dependency > < / dependencies >Copy the code
2.1.2 Providers
package com.zthl.producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; Provider(Provider) ** / public class Provider {public static void main(String[] args) throws Exception{// Create a connection factory ConnectionFactory connectionFactory = new ConnectionFactory(); // Set parameters // host address; The default localhost connectionFactory. SetHost (" localhost "); // Connect port; The default 5672 connectionFactory. SetPort (5672); // Virtual host name; The default/connectionFactory setVirtualHost ("/"); // Connect user name; The default guest connectionFactory. SetUsername (" guest "); // Connect password; The default guest connectionFactory. SetPassword (" guest "); / / create a Connection Connection Connection = connectionFactory. NewConnection (); // createChannel Channel = connection.createchannel (); Parameter 1: Queue Queue name * Parameter 2: durable Whether to define a persistent queue. When MQ restarts, the durable queue exists. Parameter 3: exclusive whether to exclusive the connection. Parameter 5: arguments Other parameters * */ channel.queueDeclare("simple_queue",true,false,false,null); // Message to be sent String message = "dude! Are you home?" ; /** * Parameter 1: switch name, Default if this parameter is not specified Default Exchage * Parameter 2: route key, simple mode can be set to queue name * Parameter 3: configuration message * Parameter 4: BasicPublish ("","simple_queue", NULL,message.getBytes()); System.out.println(" message sent: "+ message); // Close channel.close(); connection.close(); }}Copy the code

Provider run results: As we can see from the console, queues and messages have been generated.

2.1.3 Consumers
package com.zthl.comsumer; import com.rabbitmq.client.*; import java.io.IOException; Public class Consumer {public static void main(String[] args) throws Exception{// Create a connection factory ConnectionFactory connectionFactory = new ConnectionFactory(); / / set parameters connectionFactory. SetHost (" localhost "); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); / / create a Connection Connection Connection = connectionFactory. NewConnection (); // createChannel channel channel = connection.createchannel (); QueueDeclare (String Queue, Boolean durable, Boolean EXCLUSIVE, Boolean autoDelete, Map<String, Object> arguments) * Parameters 1. Queue: queue name * Parameters 2. Durable: Whether to persist * Parameters 3. Only one consumer can listen to the queue * arguments 4. AutoDelete: whether arguments are automatically deleted 5. Arguments: Arguments */ channel.queueDeclare("simple_queue",true,false,false,null); Consumer = new DefaultConsumer(channel){/** * The callback method is automatically executed when the message is received. Parameter 3: Properties: configuration information * parameter 4: body: */ @override public void handleDelivery(String consumerTag, Envelope, amqp.basicProperties properties, Byte [] body) throws IOException {system.out.println ("consumerTag: "+ consumerTag); byte[] body) throws IOException {system.out.println ("consumerTag: "+ consumerTag); System.out.println("Exchange: "+ envelope. GetExchange ()); System.out.println(" envelope. GetRoutingKey () "); System.out.println("properties: "+ properties); System.out.println("body: "+ new String(body)); }}; /** * basicConsume(String queue, Boolean autoAck,Consumer callback) Parameter 3: Callback callback object * */ channel.basicConsume("simple_queue",true,consumer); }}Copy the code

Consumer Running results:

After the consumer consumes, the console changes:

.2. Work queue mode

In simple terms, a single provider provides messages that are consumed by multiple consumers.

2.2.1 Provider
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; Public class WorkProvider {public static void main(String[] args) throws Exception {// create ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); / / create a Connection Connection Connection = connectionFactory. NewConnection (); // createChannel Channel = connection.createchannel (); Queue channel.queueDeclare("work_queue",true,false,false,null); for (int i = 1; i <= 10; i++) { String message = i+"hello rabbitmq~~~"; BasicPublish ("","work_queue",null, message.getbytes ()); } // Release channel.close(); connection.close(); }}Copy the code

The result of running the provider console:

2.2.2 Consumer 1
import com.rabbitmq.client.*; import java.io.IOException; Public class WorkConsumer1 {public static void main(String[] args) throws Exception {// create ConnectionFactory ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection Connection = factory.newConnection(); // createChannel Channel Channel = connection.createchannel (); Queue channel.queueDeclare("work_queue",true,false,false,null); Consumer = new DefaultConsumer(channel){@override public void handleDelivery(String consumerTag, BasicProperties, byte[] body) throws IOException {// Print only consumed messages for easy observation. System. The out. Println (" body: "+ new String (body)); }}; channel.basicConsume("work_queue",true,consumer); }}Copy the code

Consumer 1 Running results:

2.2.3 Consumer 2
import com.rabbitmq.client.*; import java.io.IOException; Public class WorkConsumer2 {public static void main(String[] args) throws Exception {// create ConnectionFactory ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection Connection = factory.newConnection(); // createChannel Channel Channel = connection.createchannel (); Queue channel.queueDeclare("work_queue",true,false,false,null); Consumer = new DefaultConsumer(channel){@override public void handleDelivery(String consumerTag, BasicProperties, byte[] body) throws IOException {// Print only consumed messages for easy observation. System. The out. Println (" body: "+ new String (body)); }}; channel.basicConsume("work_queue",true,consumer); }Copy the code

Consumer 2 Running results:

Conclusion:

If multiple consumers consume messages in the same queue, there is a competitive relationship between multiple consumers.

2.3 publish and subscribe model

Switch type: FANout (broadcast type)

An additional role has been added to publish and subscribe, namely exchange.

Proveider: provider.

-Penny: Consumer.

A switch has two functions: to receive messages and to process messages. So how to process messages?

Fanout: broadcast type that broadcasts messages to all queues bound to the switch

Direct: directs the message to the queue with the specified routing key.

Topic: Wildcard type, send messages to queues that conform to rules.

The switch can only forward messages, but not store them. If no queue is bound to the switch, the messages on the switch will be lost.

Queue: message queue

2.3.1 Providers
Public static void main(String[] args) throws Exception {// Create a ConnectionFactory Factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection Connection = factory.newConnection(); // createChannel Channel = connection.createchannel (); String exchangeName = "fanout_exchange"; / / create exchange channel. ExchangeDeclare (exchangeName, BuiltinExchangeType FANOUT, true, false, false, null); Type: switch type * FANOUT(" FANOUT "), broadcast type * DIRECT(" DIRECT "), direction type * TOPIC(" TOPIC ") wildcard type * Parameter 3, Durable: Whether to persist * Parameters 4, autoDelete: automatically delete * parameters 5, internal: internal use * Parameters 6, Arguments: Parameter */ // Create queue String queue1Name = "fanout_queue1"; String queue2Name = "fanout_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // Bind queue and switch channel.queueBind(queue1Name,exchangeName,""); channel.queueBind(queue2Name,exchangeName,""); /** parameters 1, queue: queue name parameter 2, exchange: switch name parameter 3, routingKey If the switch type is FANout (broadcast mode), routingKey is set to "*/ String Body =" Pain, the sooner you experience it, the better, for example: love "; // Send channel.basicPublish(exchangeName,"",null,body.getBytes()); Channel.close (); connection.close(); }Copy the code

Console result after program running:

You can see the queues that the switch is bound to

2.3.2 Consumer 1
import com.rabbitmq.client.*; Public class FanoutConsumer1 {public static void main(String[] args) throws Exception {// create ConnectionFactory ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection Connection = factory.newConnection(); // createChannel Channel Channel = connection.createchannel (); String queue1Name = "fanout_queue1"; Consumer = new DefaultConsumer(channel){@override public void handleDelivery(String consumerTag, BasicProperties properties, byte[] body){system.out.println ("body: "+new String(body)); }}; channel.basicConsume(queue1Name,true,consumer); }}Copy the code

The results

2.3.3 Consumer 2
import com.rabbitmq.client.*; Public class FanoutConsumer2 {public static void main(String[] args) throws Exception {// create ConnectionFactory ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection Connection = factory.newConnection(); // createChannel Channel Channel = connection.createchannel (); String queue2Name = "fanout_queue2"; Consumer = new DefaultConsumer(channel){@override public void handleDelivery(String consumerTag, BasicProperties properties, byte[] body){system.out.println ("body: "+new String(body)); }}; channel.basicConsume(queue2Name,true,consumer); }}Copy the code

The results

Start all the consumers, send the message with the provider, and you can see that all the consumers have consumed the message, and that’s fanout.

Console changes: You can see that the message has been consumed

2.4. Routing mode

Switch type: Direct(Directional type)

Note:

1. Binding queues to switches requires specifying a RoutingKey.

When a provider sends a message to Exchange, it needs to specify a RoutingKey for the message.

Exchange determines whether the Routing Key is the same or not. A message can be consumed only if the Routingkey of the queue is the same as the message’s Routing Key.

2.4.1 Provider
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; Public class DirectProvider {public static void main(String[] args) throws Exception {// create a ConnectionFactory ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection Connection = factory.newConnection(); // createChannel Channel Channel = connection.createchannel (); String exchangeName = "direct_exchange"; / / create exchange channel. ExchangeDeclare (exchangeName, BuiltinExchangeType. DIRECT, true, false, false, null); String queue1Name = "direct_queue1"; String queue2Name = "direct_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // queueBind one channel. QueueBind (queue1Name,exchangeName,"one"); // bind queue2Name to one two three channels. queueBind(queue2Name,exchangeName,"one"); channel.queueBind(queue2Name,exchangeName,"two"); channel.queueBind(queue2Name,exchangeName,"three"); String message = "Three people, there must be a teacher!" ; BasicPublish (exchangeName,"three",null,message.getBytes()); System.out.println(" sent successfully: "+ message); Channel.close (); connection.close(); }}Copy the code

Run the provider console information:

In the following two images, you can see that the switch bound queue has only one message to consume in the Direct_queue2 queue corresponding to the routingKey that was set when the message was sent.

2.4.2 Consumer 1
import com.rabbitmq.client.*; Public class DirectConsumer1 {public static void main(String[] args) throws Exception {// create ConnectionFactory ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection Connection = factory.newConnection(); // createChannel Channel Channel = connection.createchannel (); String queue1Name = "direct_queue1"; Consumer = new DefaultConsumer(channel){@override public void handleDelivery(String consumerTag, BasicProperties properties, byte[] body){system.out.println ("body: "+new String(body)); }}; channel.basicConsume(queue1Name,true,consumer); }}Copy the code
2.4.3 Consumer 2
import com.rabbitmq.client.*; Public class DirectConsumer2 {public static void main(String[] args) throws Exception {// create ConnectionFactory ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection Connection = factory.newConnection(); // createChannel Channel Channel = connection.createchannel (); String queue2Name = "direct_queue2"; Consumer = new DefaultConsumer(channel){@override public void handleDelivery(String consumerTag, BasicProperties properties, byte[] body){system.out.println ("body: "+new String(body)); }}; channel.basicConsume(queue2Name,true,consumer); }Copy the code

Consumer 1 Running results:

Consumer 2 Running results:

We can see that only consumer 2 with the corresponding routingKey consumes a message. This is the Direct type, and a message can be consumed only if the routingKey of the specified queue is the same as the routingKey of the message.

2.5. Wildcard mode

Switch type: Topic(wildcard) type

Note:

The Topic and Direct types work the same way: they both use a routingKey to send a message to the corresponding queue. The difference is that a Topic routingKey is a wildcard consisting of one or more words. Separate multiple words with “. “.

Wildcards have rules:

1, # : match one or more words.

Example 1: one.# can match all wildcards starting with one, such as one.two or one.two. Three or one.two.three

Example 2: #. One can match all wildcards ending in one, such as: two. One or three

* : matches only one word

Example 1: One.* Can match one. Two or one. Three

One can match two. One or three. One

I think that explains it pretty well. Hey hey hey!!

2.5.1 Provider
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; Public class TopicProvider {public static void main(String[] args) throws Exception {// create a ConnectionFactory ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "topic_exchange"; / / create exchange channel. ExchangeDeclare (exchangeName, BuiltinExchangeType TOPIC, true, false, false, null); String queue1Name = "topic_queue1"; String queue2Name = "topic_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // Bind queue and switch channel.queueBind(queue1Name,exchangeName,"two.*"); channel.queueBind(queue1Name,exchangeName,"#.one"); channel.queueBind(queue2Name,exchangeName,"*.*"); String body = "Did you fail?" ; BasicPublish (exchangeName,"two.one",null,body.getBytes()); Channel.close (); connection.close(); }}Copy the code

Result of running console:

2.5.2 Consumer 1
import com.rabbitmq.client.*; Public class TopicConsumer1 {public static void main(String[] args) throws Exception {// create ConnectionFactory ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection Connection = factory.newConnection(); // createChannel Channel Channel = connection.createchannel (); String queue1Name = "topic_queue1"; Consumer = new DefaultConsumer(channel){@override public void handleDelivery(String consumerTag, BasicProperties properties, byte[] body){system.out.println ("body: "+new String(body)); }}; channel.basicConsume(queue1Name,true,consumer); }}Copy the code

Running results:

2.5.3 Consumer 2
import com.rabbitmq.client.*; Public class TopicConsumer2 {public static void main(String[] args) throws Exception {// create ConnectionFactory ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection Connection = factory.newConnection(); // createChannel Channel Channel = connection.createchannel (); String queue2Name = "topic_queue2"; Consumer = new DefaultConsumer(channel){@override public void handleDelivery(String consumerTag, BasicProperties properties, byte[] body) {system.out.println ("body: "+new String(body)); }}; channel.basicConsume(queue2Name,true,consumer); }}Copy the code

Running results:

Topic and Direct are similar except for routingKey, which is a wildcard, so it is more flexible to use than Direct.

The end of the

This is the end of rabbitMQ, I hope you can learn and progress together! Behind will continue to update advanced articles, if interested, we can learn together!