This is my 28th day of the August Challenge. Check out the details: August Challenge @TOC
Concepts related to MQ
What is the MQ
MQ (Message Queue) stands for queue. FIFO is a first in, first out (FIFO), but it’s just messages in the queue and it’s a cross-process communication mechanism for sending messages up and down. Communication services of “logical decoupling + object understanding decoupling”. Using MQ, messages are sent only by MQ and not by any other service.
What is the RabbitMq
RabbitMq is a message-oriented middleware. He receives and forwards the message. Can be used as a delivery stop. When you want to send a package, drop it off at the express delivery station and the Courier at the express delivery station delivers it for you. RabbitMq differs from a Courier station in that it receives, stores and forwards messages rather than processing them.
Basic concepts of RabbitMq
Exchange
The message sent by the producer is accepted and routed to the queue in the server according to the Binding rule. An ExchangeType determines the behavior of an Exchange routing message. Exchangetypes are commonly used in RabbitMQ as direct, Fanout and Topic
Message Queue
Message queues. Any message we send to RabbitMQ will end up in queues where it is stored (data will be lost if the route does not find a queue), waiting to be picked up by the consumer.
Binding Key
It indicates that the Exchange and Message Queue are linked by a binding key, and the relationship is fixed.
Routing Key
When sending a message to an Exchange, producers specify a routing key that specifies the routing rules for the message. This routing key needs to be used in conjunction with the Exchange Type and binding key. Our producer only needs to specify the routing key to determine where the message will go.
RabbitMq scenarios
The service of decoupling
Suppose that service A now generates the data and service BCD needs the data so let’s have service A call the BCD service directly. Just pass the data along. However, with the continuous expansion of our application scale, there will be more and more services that need A’s data. If there are dozens or even hundreds of downstream services, it will be difficult to maintain the calling code in A service by considering whether the downstream services will make mistakes. Such services are too tightly coupled.
As shown in the figure below
In the case of RabbitMq decoupling, service A simply sends messages to the server and doesn’t have to worry about how the data is needed or who needs it.As is shown in
Peak flow away
Let’s say our app has an average traffic of every 300 seconds and we can easily handle that with one server
At peak times the number of visits increases tenfold to 3000 or more per second and there’s no way a single server can handle that. You have to think about multiple servers to spread the load around but you can’t have that many peaks forever so multiple servers is a bit of a waste. We could also consider RabbitMq Flow peak, peak, instantly appear a large number of request data, first sent to the message queue server, waiting to be processed, and our application, can slowly from the message queue receives the request data processing, so that the data processing time stretched, in order to reduce the instantaneous pressure This is a very typical application scenario for the message queue serverAs is shown in
The asynchronous call
Consider the success of ordering takeout payment
After the payment, we need to send the notification of successful payment, and then look for the delivery boy to deliver the goods. The process of looking for the delivery boy is very time-consuming, especially in the peak hours, and may need to wait for dozens of seconds or even longer
This results in very slow response of the entire invocation link
If we introduce a RabbitMQ message queue, the order data can be sent to the message queue server, the link can be called and the order system can respond immediately, with a total link response time of around 200 milliseconds
The delivery guy finder application can receive the order message from the message queue in an asynchronous manner and then perform the time-consuming search operation
Hello world
The producer and receiver that send a single message and print out the consumer
Add the dependent
<! -- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<! -- https://mvnrepository.com/artifact/commons-io/commons-io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
Copy the code
Producer code
As is shown inWrite the producer code first and then write the consumer code
// Producer code
public class Producer {
// Prepare the queue name
public static final String QUEUE_NAME = "fjj";
public static void main(String[] args) throws Exception {
// Create a linked object
ConnectionFactory factory = new ConnectionFactory();
// Set factory information
factory.setHost("localhost");
/ / user name
factory.setUsername("guest");
/ / password
factory.setPassword("guest");
// Create a link
Connection connection = factory.newConnection();
// Get the channel
Channel channel = connection.createChannel();
// Generate a queue
Parameter 1 The name of our message queue
// Parameter 2 Persistent (disk) The default value is stored in memory. True: the value is persistent and will be saved even after the server restarts. False: Non-persistent
// Whether 2 is available for consumption by one consumer or whether messages are shared true can be consumed by multiple consumers
// Parameter 4 is automatically deleted after the last consumer is gone
// Other parameters
channel.queueDeclare(QUEUE_NAME, true.false.true.null);
// Ready to send messages I write 100 messages here
String message = " Hi,My name is fjj";
ArrayList<String> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
list.add(message+i);
}
for (String messages : list) {
// Parameter 1 is sent to that switch
// Parameter 2 specifies the queue name of the route's Key value
// Parameter 3 Other parameters
// The message sent by parameter 4 must be converted here
channel.basicPublish("",QUEUE_NAME,null,messages.getBytes());
}
System.out.println("Send over"); }}Copy the code
Check out our Web interface
Consumer code
public class Consumer {
// Prepare the name of the queue to receive
public static final String QUEUE_NAME = "fjj";
// Receive the message
public static void main(String[] args) throws Exception {
// Create a linked object
ConnectionFactory factory = new ConnectionFactory();
// Set the queue for the factory
factory.setHost("localhost");
/ / user name
factory.setUsername("guest");
/ / password
factory.setPassword("guest");
// Create a link
Connection connection = factory.newConnection();
// Receive the message
Channel channel = connection.createChannel();
// Parameter 1 consumes that queue
// Parameter 2 should be automatically processed after successful consumption
// Parameter 3 a callback where the consumer did not successfully consume
// Parameter 4 the consumer cancels the consumer callback
// Declare to receive the message
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("message = " +new String( message.getBody()));
};
// Cancel the callback of the message
CancelCallback cancelCallback = consumerTag -> {
System.out.println("Message break");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); }}Copy the code
Console view Viewing information on the Web UI
Work queue Principle
The main idea of work queues (task queues) is to avoid performing a resource-intensive task immediately and having to wait for it to complete. Instead, we schedule it for later. Encapsulate the task as a message and send it to a queue. A worker process running in the background pops up the task and eventually executes the job. When there are multiple worker threads, these threads will work together on these tasks.
The rotation distributes the message
In this case we’re going to start two worker threads one message sending thread and see how it works between two worker threads.
Encapsulated utility class
This duplication of code should be isolated. As long as it’s easy to call
public class RabbitmqUtil {
public static Channel getChannel (a) throws Exception {
// Create a linked object
ConnectionFactory factory = new ConnectionFactory();
// Set the queue for the factory
factory.setHost("localhost");
/ / user name
factory.setUsername("guest");
/ / password
factory.setPassword("guest");
// Create a link
Connection connection = factory.newConnection();
// Receive the message
Channel channel = connection.createChannel();
returnchannel; }}Copy the code
Start two working threads
Consumer code
// This is the first worker thread
public class Consumer01 {
// Prepare the name of the queue to receive
public static final String QUEUE_NAME = "fjj";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
// Receive the message
// Parameter 1 consumes that queue
// Parameter 2 should be automatically processed after successful consumption
// Parameter 3 a callback where the consumer did not successfully consume
// Parameter 4 the consumer cancels the consumer callback
// Declare to receive the message
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("message = " +new String( message.getBody()));
};
// Cancel the callback of the message
CancelCallback cancelCallback = consumerTag -> {
System.out.println("Message break");
};
System.out.println("C1 waiting thread !!!!!!!!!");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); }}Copy the code
Another consumer code just like this one will do
Create the producer’s code
public class Producer01 {
// Specify the queue name
public static final String QUEUE_NAME = "fjj";
// Send a large number of messages
public static void main(String[] args) throws Exception {
// Create a link
Channel channel = RabbitmqUtil.getChannel();
channel.queueDeclare(QUEUE_NAME,false.false.false.null);
// Send a message
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String msg = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("Producer over"); }}}Copy the code
Work queue results
You poll me one by one
Message response
It may take a while for a consumer to complete a task, and if one of the consumers is working on a long task and only partially completed it suddenly dies, What happens when RabbitMq sends a message to a consumer and immediately removes that message flag in which case a consumer suddenly hangs up and we lose data and all subsequent data sent to that consumer disappears
To ensure that messages are not lost in the process of being sent RabbitMq has introduced a message reply mechanism which means that the consumer has received the message and processed it. RabbitMq has processed it and can delete the message
Automatic reply
This mode requires a trade-off between high throughput and data transmission security because in this mode, if a link or Cannel closes on the consumer side before the message is received, the message is lost. On the other hand, of course, the consumer side may be overloaded with messages. There is no limit on the number of messages delivered and it is also possible that consumers receive too many messages to process. This causes message backlogs and eventually runs out of memory. Eventually these consumer threads are killed by the operating system so this default is appropriate for situations where consumers can process these messages efficiently and at some rate.
Manual response
- Cannel.basicack (for affirmative confirmation) RabbitMq knows the message and successfully processed it, and can discard it
- Cannel.basicnack (used to deny confirmation)
- Cannel.basicReject has one less parameter than cannel. basicNack to reject the message and discard it.
Messages are automatically requeued
If the consumer loses the connection for some reason (the channel is closed the connection is closed or the Tcp connection is lost), ACK confirmation RabbitMq will know that the message is not fully processed and will requeue it if it can be processed by another consumer at this point and will redistribute it to another consumer so that even if a consumer dies occasionally no messages will be lost
Message manual reply code
Producer code
public class Producer02 {
// Set the queue name
public final static String QUEUE_NAME ="ack_Hello";
public static void main(String[] args) throws Exception {
// Get the channel
Channel channel = RabbitmqUtil.getChannel();
// Set send
channel.queueDeclare(QUEUE_NAME,false.false.false.null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String msg = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes("UTF-8"));
System.out.println("Producer over"); }}}Copy the code
Consumer03
public class Consumer03 {
public final static String QUEUE_NAME ="ack_Hello";
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
/ / into a deep sleep
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("C3 received message =" +new String( message.getBody()));
// 1 Message tag
// 2 Specifies whether to reply in batches
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
// Cancel the callback of the message
CancelCallback cancelCallback = consumerTag -> {
System.out.println("Message break");
};
boolean authAck = false; channel.basicConsume(QUEUE_NAME, authAck, deliverCallback, cancelCallback); }}Copy the code
The other consumer is just like this one except for a different amount of sleep
Message response result
Normally the sender of a message sends two messages C1 and C2 each receive the message and process it but at the point of sending a message when C2 consumer suddenly dies in the sense of polling the message is going to be C2 and when C2 dies the message will come back Message queues are re-enqueued and then allocated to C1 consumers for consumption.
The RabbitMq persistence
concept
You have just seen how to handle this without losing a task but how to ensure that messages sent by a message sender are not lost when the RabbitMq service is down. By default RabbitMq exits or crashes for some reason, it ignores queues and messages unless it is told not to do so. To ensure that messages are not lost, two things need to be done: We need to mark both the queue and the message as persistent.
How do queues persist
The queues we created are non-persistent and will be removed if rabbitMQ restarts. If you want to persist a queue, you need to set burable to persist when you declare the queue
However, it is important to note that if the previously declared queue is not persistent, you need to delete the original queue or create a new persistent queue otherwise an error will occur.
The message is persistent
Messages to make persistent need producers to modify the code, MessageProperties. PERSISTENT_TEXT_PLAIN add this attribute.
Marking messages as persistent is not a complete guarantee against message loss although he tells RabbitMq to save the message to disk, it is still there when the message is ready to be stored on disk, But the message is still in the cache at a break point where it’s not actually written to disk at this point and the persistence guarantee is not strong but it’s more than enough for our simple task queue.