This is the first day of my participation in the More text Challenge. For details, see more text Challenge
By Threedayman Hang Seng LIGHT Cloud Community
What is the RabbitMQ
RabbitMQ is the most widely deployed open source messaging broker. RabbitMQ is lightweight and easy to deploy. Supports multiple messaging protocols.
Why use RabbitMQ
Common use scenarios include decoupling, asynchronous, and peak load shifting. Let’s use examples to see the benefits of using MQ in our respective scenarios.
The decoupling
Suppose we have system A, which depends on system B, system C and system D, and the dependencies have been written dead in the code. The structure is as follows:
Suppose A new requirement comes along, and system A needs to call system E for some new business operation, then the programmer of System A has to have A hard time dealing with the need to access system E. Similarly, if you want to remove the dependency of A system, such as system C, it also needs to be handled by the development of system A.
So let’s see what happens if we introduce MQ.
System A sends messages to MQ, and systems B, C, and D subscribe to the corresponding messages for service processing. Let’s take a look at the previous scenario. Suppose that a dependency needs to be added to system E, only the developers of system E need to subscribe accordingly. Similarly, to cancel the dependency on system C, only system C needs to unsubscribe the corresponding message.
asynchronous
Assuming system A takes 30ms to operate, and system A also calls system B(300ms), System C(600ms), and system D(200ms) synchronously, the response time for this request will reach 1130ms. Long response times can lead to a bad user experience for customers.
What happens when MQ is introduced
System A sends the message to MQ (7ms) and then returns it. System B, C, and D listen to MQ for business processing respectively. So we see that the overall response time dropped from 1130ms to 37ms when MQ was introduced for asynchronous processing in response to the long synchronous dependency we just had.
Peak peel
Suppose we have a peak traffic volume of 7000 /s and a peak traffic volume of 100/s, but our mysql database can only handle 2000/s.
In this case, the highest load capacity of MQSQL is exceeded during peak periods and the MQSQL resources are not properly utilized during peak periods.
What happens when MQ is introduced
At this time, the system can pull messages according to its maximum consumption capacity of 2000/s, which can smoothly pass the business peak, and at the same time, some messages will be delayed to the business trough period for processing. The database will not be suspended due to high traffic, and the whole service will not be available.
How do I use RabbitMQ
This section focuses on a few common examples written for the Java client of RabbitMQ. If you are already familiar with using RabbitMQ, you may skip this section. For complete RabbitMQ instructions, see the official documentation.
Hello world
Let’s take a look at RabbitMQ with a Hello World example. First, the terms used in this example
- Producer: The Producer is used to send messages.
- Queue: a Queue in which messages are stored. Messages are delivered to a Queue by a producer and then to a consumer for consumption. The Queue is subject to machine memory and disk resource constraints.
- Consumer: a Consumer who receives and processes messages.
In this example we will produce the Hello World message, receive it through the consumer and print it out.
The key steps of producer Send are described in the comments
public class Send {
// Queue name
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// Create a connection between the server and the server
ConnectionFactory factory = new ConnectionFactory();
// Set the IP address of the deployed node
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// Declare a queue to send messages
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
String message = "Hello World!";
// Publish the message
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'"); }}}Copy the code
Complete Send code review
The key steps of consumer Recv are described in the notes
public class Recv {
// Queue name
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// Create a connection between the server and the server
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Declare the queue to consume
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// This class is used to process messages
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }}Copy the code
Complete Recv code review
Work Queues
In this example we will show how to distribute time-consuming tasks to multiple workers using RabbitMQ. RabbitMQ delivers messages to consumers in a round-robin manner, which makes it easy to extend the spending power.
The key steps of producer NewTask are described in the comments
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// Make the queue persistent
channel.queueDeclare(TASK_QUEUE_NAME, true.false.false.null);
String message = String.join("", argv);
// Make the message persistent
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'"); }}}Copy the code
Complete NewTask code review
Consumer Woker key steps are described in the comments
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// Make the queue persistent
channel.queueDeclare(TASK_QUEUE_NAME, true.false.false.null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// A consumer can process a maximum of one unconfirmed message simultaneously
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }}; channel.basicConsume(TASK_QUEUE_NAME,false, deliverCallback, consumerTag -> { });
}
// Simulate time consuming tasks, one. It takes 1 second.
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '. ') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
Copy the code
Complete Worker code review
Publish/Subscribe
We’ve already looked at RabbitMQ’s core message model, producer, consumer, queue, and in this section we’ll look at another message model, Exchange **, which is responsible for receiving messages from producers and delivering them to queues. There are mainly the following types of exchage **
- direct
- topic
- headers
- fanout
In this example we are looking at the fanout type, which we can probably guess from the name will broadcast the received message to its bound queue.
The key steps of producer EmitLog are described in the comments
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// Create an Exchange and specify the type
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = argv.length < 1 ? "info: Hello World!" :
String.join("", argv);
// This is different from the previous message, specifying a specific exchange does not specify a specific queue
channel.basicPublish(EXCHANGE_NAME, "".null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'"); }}}Copy the code
EmitLog complete code check
The key steps for consumer ReceiveLogs are described in the comments
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Create an Exchange of type Fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// Get a unique, nonpersistent, automatically deleted queue
String queueName = channel.queueDeclare().getQueue();
// Link exchage to queue by binding
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }}Copy the code
ReceiveLogs complete code check
Routing
In the previous example exchange broadcast the received message to the bound queue. In this example we will add some specificity to the binding to give Exchange the ability to deliver different messages to different queues using a routingKey (full match). For example, daily logs distinguish error logs into separate queues.
The key steps of producer EmitLogDirect are described in the comments
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// Declare an Exchange of type Direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String severity = getSeverity(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "' : '" + message + "'"); }}private static String getSeverity(String[] strings) {
if (strings.length < 1)
return "info";
return strings[0];
}
private static String getMessage(String[] strings) {
if (strings.length < 2)
return "Hello World!";
return joinStrings(strings, "".1);
}
private static String joinStrings(String[] strings, String delimiter, int startIndex) {
int length = strings.length;
if (length == 0) return "";
if (length <= startIndex) return "";
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex + 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
returnwords.toString(); }}Copy the code
EmitLogDirect complete code review
Consumer ReceiveLogsDirect Key steps are described in the comments
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for (String severity : argv) {
// Establish the relationship between the exchange and queue and set the routingKey
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "' : '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }}Copy the code
ReceiveLogsDirect for complete code review
Topics
Provides richer rules for routing exchange to queue. Rules are separated by a. RoutingKey with a maximum limit of 255bytes. Unlike the previous fully matched routingKey, the Topic-type Exchange routingKey adds two major features.
- * stands for a word **. 天安门事件
- **#** represents zero or one word.
The key steps for producer EmitLogTopic are described in the comments
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = getRouting(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "' : '" + message + "'"); }}private static String getRouting(String[] strings) {
if (strings.length < 1)
return "anonymous.info";
return strings[0];
}
private static String getMessage(String[] strings) {
if (strings.length < 2)
return "Hello World!";
return joinStrings(strings, "".1);
}
private static String joinStrings(String[] strings, String delimiter, int startIndex) {
int length = strings.length;
if (length == 0) return "";
if (length < startIndex) return "";
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex + 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
returnwords.toString(); }}Copy the code
EmitLogTopic complete code review
Consumer ReceiveLogsTopic Key steps are described in the comments
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for (String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "' : '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }}Copy the code
ReceiveLogsTopic Complete code check
What are the challenges of introducing RabbitMQ
With this in mind, you might be tempted to consider adding RabbitMQ to the project to optimize the current usage scenario. Should we deploy a RabbitMQ service and send messages without worrying about it? In fact, when introducing a piece of middleware, there are some problems, and if we don’t know enough about these problems, then congratulations, you are in the pit player sequence. In order to be a reliable programmer, we need to fully understand the challenges of introducing middleware to our projects so that we can handle them in the future. The following are some common types of problems in message-oriented middleware
- Message loss
- The message to repeat
- Messages are stacked
- Availability of RabbitMQ is guaranteed
In subsequent articles, we will explore solutions to each of these problems. RabbitMQ Message Reliability Transmission (2)
Reference documentation
https://www.rabbitmq.com/ the RabbitMQ official documentation
Tips: The author’s personal experience is limited, so please correct the shortcomings.