AMQP

RabbitMQ is the implementation of AMQP protocol, which is a high level abstraction layer message communication protocol. It mainly consists of the following components:

1.Server(Broker): a process that accepts client connections and implements AMQP message queuing and routing functions.

2.Virtual Host: It is actually a Virtual concept, similar to permission control group. A Virtual Host can have several exchanges and queues, but the minimum granularity of permission control is Virtual Host

3.Exchange: Receives the message sent by the producer and routes it to the queue in the server according to the Binding rule. Exchangetypes determine the behavior of Exchange routing messages. For example, in RabbitMQ, exchangeTypes are direct, Fanout, and Topic. Different types of Exchange routes behave differently.

4.Message Queue: A Message Queue used to store messages that have not yet been consumed by consumers.

5.Message: Consists of a Header and a Body. A Header is a collection of attributes added by the producer, including whether a Message is persisted, which Message Queue receives it, and what priority it has. The Body is the actual APP data that needs to be transferred.

6.Binding:Binding binds the Exchange and Message Queue. Exchange generates a routing table after Binding multiple Message queues. The routing table stores Binding keys for the messages required by Message queues. When Exchange receives a Message, it parses its Header to obtain a Routing Key. Exchange routes the Message to the Message Queue based on the Routing Key and Exchange Type. The Binding Key is specified by the Consumer when Binding the Exchange and Message Queue, and the Routing Key is specified by the Producer when sending the Message. The Exchange Type determines how the two match.

RabbitMQ is a TCP Connection between the client and Broker.

8.Channel: a client cannot send a message after only creating a connection between the client and Broker. A Channel must be created for each Connection. The AMQP protocol specifies that AMQP commands can be executed only through a Channel. A Connection can contain multiple channels. A Channel is needed because TCP connections are expensive to establish and release. If each thread of a client needs to interact with the Broker, and if each thread establishes a TCP connection, the operating system cannot afford to establish so many TCP connections per second, regardless of whether TCP connections are wasted. RabbitMQ recommends that client threads do not share channels, or at least that the threads that share channels send messages serially, but that connections be shared as much as possible.

9.Com Command:AMQP Command. The client realizes its own logic by interacting with the AMQP server through Command. In RabbitMQ, for example, the client can publish a message, txSelect starts a transaction, and txCommit commits a transaction.

After understanding the AMQP model, it is necessary to briefly introduce the protocol stack of AMQP. AMQP protocol itself consists of three layers:

1.Module Layer, located at the highest Layer of the protocol, mainly defines some commands for clients to call. Clients can use these commands to realize their business logic.

2.Session Layer, which is mainly responsible for sending commands from the client to the server and returning replies from the server to the client, provides reliability, synchronization mechanism and error handling for communication between the client and the server.

3.Transport Layer, which mainly transmits binary data streams and provides frame processing, channel multiplexing, error detection and data representation.

RabbitMQ usage scenarios

Learn the RabbitMQ usage scenarios, from official tutorial: www.rabbitmq.com/getstarted….

Scenario 1: Single sending and single receiving

Usage scenario: simple send and receive, no special processing.

Producer:

import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class Send {        private final static String QUEUE_NAME = "hello";   public static void main(String[] argv) throws Exception {                      ConnectionFactory factory = new ConnectionFactory();     factory.setHost("localhost");     Connection connection = factory.newConnection();     Channel channel = connection.createChannel();     channel.queueDeclare(QUEUE_NAME, false, false, false, null);     String message = "Hello World!";     channel.basicPublish("", QUEUE_NAME, null, message.getBytes());     System.out.println(" [x] Sent '" + message + "'");          channel.close();     connection.close();   } }Copy the code

Consumer:

import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); }}}Copy the code

Scenario 2: Single sending and multiple receiving

Usage scenario: One sender and multiple receivers, for example, distributed task distribution. In order to ensure the reliability of message sending and not to lose the message, the message is persistent. In addition, to prevent the receiving end from going down during the processing of the message, the receiving end sends an ACK message only after the message processing is complete.

Producer:

import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class NewTask {      private static final String TASK_QUEUE_NAME = "task_queue";   public static void main(String[] argv) throws Exception {     ConnectionFactory factory = new ConnectionFactory();     factory.setHost("localhost");     Connection connection = factory.newConnection();     Channel channel = connection.createChannel();          channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);          String message = getMessage(argv);          channel.basicPublish( "", TASK_QUEUE_NAME,                  MessageProperties.PERSISTENT_TEXT_PLAIN,                 message.getBytes());     System.out.println(" [x] Sent '" + message + "'");          channel.close();     connection.close();   }        private static String getMessage(String[] strings){     if (strings.length < 1)       return "Hello World!";     return joinStrings(strings, " ");   }        private static String joinStrings(String[] strings, String delimiter) {     int length = strings.length;     if (length == 0) return "";     StringBuilder words = new StringBuilder(strings[0]);     for (int i = 1; i < length; i++) {       words.append(delimiter).append(strings[i]);     }     return words.toString();   } }Copy the code

Differences between the sender and scenario 1:

1. Declare another Queue with “task_queue” because RabbitMQ will not allow two queues with the same name and different configurations

2. Set the durable property of the Queue of “task_queue” to true, even if the message Queue is durable

3, use MessageProperties. Durable PERSISTENT_TEXT_PLAIN make news

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren’t lost: we need to mark both the queue and messages as durable.

Consumer:

import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(TASK_QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); }}}Copy the code

Differences between the receiver and scenario 1:

1. Use task_queue to declare message queues and make message queues durable

2. Set autoAck to false when channel.basicAck() is used to receive messages; that is, channel.basicack () sends messages after the message processing is complete.

3. Channel. BasicQos is used. In this case the sender tries to send the message to the next not busy receiver.

Note:

1) It’s a common mistake to miss the basicAck. but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won’t be able to release any unacked messages.

2) Note on message persistence

Marking messages as persistent doesn’t fully guarantee that a message won’t be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn’t saved it yet. Also, RabbitMQ doesn’t do fsync(2) for every message — it may be just saved to cache and not really written to the disk. The persistence guarantees aren’t strong, but it’s more than enough for our simple task queue. If you need a stronger guarantee you can wrap the publishing code in atransaction.

3) Note about queue size

If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.

Scenario 3: Publish/Subscribe

Usage scenario: In publish and subscribe mode, the sender sends broadcast messages and multiple receivers receive them.

Producer:

import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLog {   private static final String EXCHANGE_NAME = "logs";   public static void main(String[] argv) throws Exception {     ConnectionFactory factory = new ConnectionFactory();     factory.setHost("localhost");     Connection connection = factory.newConnection();     Channel channel = connection.createChannel();     channel.exchangeDeclare(EXCHANGE_NAME, "fanout");     String message = getMessage(argv);     channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());     System.out.println(" [x] Sent '" + message + "'");     channel.close();     connection.close();   }      private static String getMessage(String[] strings){     if (strings.length < 1)             return "info: Hello World!";     return joinStrings(strings, " ");   }      private static String joinStrings(String[] strings, String delimiter) {     int length = strings.length;     if (length == 0) return "";     StringBuilder words = new StringBuilder(strings[0]);     for (int i = 1; i < length; i++) {         words.append(delimiter).append(strings[i]);     }     return words.toString();   } }Copy the code

The sender:

Messages are sent to an exchange named logs in fanout mode, that is, broadcast messages without queue, and the sender does not care who receives them.

Consumer:

import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); }}}Copy the code

The receiver:

1, Declare exchange logs as fanout, same as sender.

2, channel. QueueDeclare (.) getQueue (); This statement obtains a Queue with a random name, non-durable, exclusive, and auto-delete. The Queue is bound to the Exchange to receive messages.

Routing key of channel.queuebind () is null, i.e. all messages are received. If this value is not null, it is ignored in exchange type “fanout” mode!

Scenario 4: Routing

Usage scenario: The sender uses routing keys to send messages. Different receivers use different routing keys to receive messages.

Producer:

import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLogDirect {   private static final String EXCHANGE_NAME = "direct_logs";   public static void main(String[] argv) throws Exception {     ConnectionFactory factory = new ConnectionFactory();     factory.setHost("localhost");     Connection connection = factory.newConnection();     Channel channel = connection.createChannel();     channel.exchangeDeclare(EXCHANGE_NAME, "direct");     String severity = getSeverity(argv);     String message = getMessage(argv);     channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());     System.out.println(" [x] Sent '" + severity + "':'" + message + "'");     channel.close();     connection.close();   }      private static String getSeverity(String[] strings){     if (strings.length < 1)             return "info";     return strings[0];   }   private static String getMessage(String[] strings){      if (strings.length < 2)             return "Hello World!";     return joinStrings(strings, " ", 1);   }      private static String joinStrings(String[] strings, String delimiter, int startIndex) {     int length = strings.length;     if (length == 0 ) return "";     if (length < startIndex ) return "";     StringBuilder words = new StringBuilder(strings[startIndex]);     for (int i = startIndex + 1; i < length; i++) {         words.append(delimiter).append(strings[i]);     }     return words.toString();   } }Copy the code

Differences between the sender and Scenario 3:

1. The exchange type is direct

2. Add the routing key when sending the message

Consumer:

import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); }}}Copy the code

Differences between the receiver and Scenario 3:

A routing key is used when binding a queue to an exchange. That is, only messages specified by the routing key are received from the exchange.

Scenario 5: Topics (send and receive by topic)

Usage scenario: The sender sends the message not by the fixed routing key, but by the string “match”, and the receiver does the same.

Producer:

import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (connection ! = null) { try { connection.close(); } catch (Exception ignore) {} } } } private static String getRouting(String[] strings){ if (strings.length < 1) return "anonymous.info"; return strings[0]; } private static String getMessage(String[] strings){ if (strings.length < 2) return "Hello World!" ; return joinStrings(strings, " ", 1); } private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0 ) return ""; if (length < startIndex ) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }}Copy the code

Differences between the sender and Scenario 4:

1. Exchange is of type topic

2. The routing key to send the message is not a fixed word, but a matching string, such as “*.lu.#”, * matches one word, and # matches zero or more words.

Consumer:

import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsTopic [binding_key]..." ); System.exit(1); } for(String bindingKey : argv){ channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } catch (Exception e) { e.printStackTrace(); } finally { if (connection ! = null) { try { connection.close(); } catch (Exception ignore) {} } } } }Copy the code

Differences between the receiver and Scenario 4:

1. Exchange is of type topic

2. The routing key for receiving the message is not a fixed word, but a matching string.

Note:

Topic exchange

Topic exchange is powerful and can behave like other exchanges. When a queue is bound with “#” (hash) binding key – it will receive all the messages, regardless of the routing key – like in fanout exchange. When special characters “*” (star) and “#” (hash) aren’t used in bindings, the topic exchange will behave just like a direct one.