This is my 28th day of the August Challenge. Check out the details: August Challenge @TOC

Concepts related to MQ

What is the MQ

MQ (Message Queue) stands for queue. FIFO is a first in, first out (FIFO), but it’s just messages in the queue and it’s a cross-process communication mechanism for sending messages up and down. Communication services of “logical decoupling + object understanding decoupling”. Using MQ, messages are sent only by MQ and not by any other service.

What is the RabbitMq

RabbitMq is a message-oriented middleware. He receives and forwards the message. Can be used as a delivery stop. When you want to send a package, drop it off at the express delivery station and the Courier at the express delivery station delivers it for you. RabbitMq differs from a Courier station in that it receives, stores and forwards messages rather than processing them.

Basic concepts of RabbitMq

Exchange

The message sent by the producer is accepted and routed to the queue in the server according to the Binding rule. An ExchangeType determines the behavior of an Exchange routing message. Exchangetypes are commonly used in RabbitMQ as direct, Fanout and Topic

Message Queue

Message queues. Any message we send to RabbitMQ will end up in queues where it is stored (data will be lost if the route does not find a queue), waiting to be picked up by the consumer.

Binding Key

It indicates that the Exchange and Message Queue are linked by a binding key, and the relationship is fixed.

Routing Key

When sending a message to an Exchange, producers specify a routing key that specifies the routing rules for the message. This routing key needs to be used in conjunction with the Exchange Type and binding key. Our producer only needs to specify the routing key to determine where the message will go.

RabbitMq scenarios

The service of decoupling

Suppose that service A now generates the data and service BCD needs the data so let’s have service A call the BCD service directly. Just pass the data along. However, with the continuous expansion of our application scale, there will be more and more services that need A’s data. If there are dozens or even hundreds of downstream services, it will be difficult to maintain the calling code in A service by considering whether the downstream services will make mistakes. Such services are too tightly coupled.

As shown in the figure below

In the case of RabbitMq decoupling, service A simply sends messages to the server and doesn’t have to worry about how the data is needed or who needs it.As is shown in

Peak flow away

Let’s say our app has an average traffic of every 300 seconds and we can easily handle that with one server

At peak times the number of visits increases tenfold to 3000 or more per second and there’s no way a single server can handle that. You have to think about multiple servers to spread the load around but you can’t have that many peaks forever so multiple servers is a bit of a waste. We could also consider RabbitMq Flow peak, peak, instantly appear a large number of request data, first sent to the message queue server, waiting to be processed, and our application, can slowly from the message queue receives the request data processing, so that the data processing time stretched, in order to reduce the instantaneous pressure This is a very typical application scenario for the message queue serverAs is shown in

The asynchronous call

Consider the success of ordering takeout payment

After the payment, we need to send the notification of successful payment, and then look for the delivery boy to deliver the goods. The process of looking for the delivery boy is very time-consuming, especially in the peak hours, and may need to wait for dozens of seconds or even longer

This results in very slow response of the entire invocation link

If we introduce a RabbitMQ message queue, the order data can be sent to the message queue server, the link can be called and the order system can respond immediately, with a total link response time of around 200 milliseconds

The delivery guy finder application can receive the order message from the message queue in an asynchronous manner and then perform the time-consuming search operation

Hello world

The producer and receiver that send a single message and print out the consumer

Add the dependent

        <! -- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <! -- https://mvnrepository.com/artifact/commons-io/commons-io -->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
Copy the code

Producer code

As is shown inWrite the producer code first and then write the consumer code

// Producer code
public class Producer {
    // Prepare the queue name
    public static final String QUEUE_NAME = "fjj";
    public static void main(String[] args) throws Exception {
        // Create a linked object
        ConnectionFactory factory = new ConnectionFactory();
        // Set factory information
        factory.setHost("localhost");
        / / user name
        factory.setUsername("guest");
        / / password
        factory.setPassword("guest");
        // Create a link
        Connection connection = factory.newConnection();
        // Get the channel
        Channel channel = connection.createChannel();
        // Generate a queue
        Parameter 1 The name of our message queue
        // Parameter 2 Persistent (disk) The default value is stored in memory. True: the value is persistent and will be saved even after the server restarts. False: Non-persistent
        // Whether 2 is available for consumption by one consumer or whether messages are shared true can be consumed by multiple consumers
        // Parameter 4 is automatically deleted after the last consumer is gone
        // Other parameters
        channel.queueDeclare(QUEUE_NAME, true.false.true.null);
        // Ready to send messages I write 100 messages here
        String message = " Hi,My name is fjj";
        ArrayList<String> list = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            list.add(message+i);
        }
        for (String messages : list) {
            // Parameter 1 is sent to that switch
            // Parameter 2 specifies the queue name of the route's Key value
            // Parameter 3 Other parameters
            // The message sent by parameter 4 must be converted here
            channel.basicPublish("",QUEUE_NAME,null,messages.getBytes());
        }
        System.out.println("Send over"); }}Copy the code

Check out our Web interface

Consumer code



public class Consumer {
    // Prepare the name of the queue to receive
    public static final String QUEUE_NAME = "fjj";

    // Receive the message
    public static void main(String[] args) throws Exception {
        // Create a linked object
        ConnectionFactory factory = new ConnectionFactory();
        // Set the queue for the factory
        factory.setHost("localhost");
        / / user name
        factory.setUsername("guest");
        / / password
        factory.setPassword("guest");
        // Create a link
        Connection connection = factory.newConnection();
        // Receive the message
        Channel channel = connection.createChannel();
        // Parameter 1 consumes that queue
        // Parameter 2 should be automatically processed after successful consumption
        // Parameter 3 a callback where the consumer did not successfully consume
        // Parameter 4 the consumer cancels the consumer callback
        // Declare to receive the message
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("message = " +new String( message.getBody()));
        };
        // Cancel the callback of the message
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("Message break");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); }}Copy the code

Console view Viewing information on the Web UI

Work queue Principle

The main idea of work queues (task queues) is to avoid performing a resource-intensive task immediately and having to wait for it to complete. Instead, we schedule it for later. Encapsulate the task as a message and send it to a queue. A worker process running in the background pops up the task and eventually executes the job. When there are multiple worker threads, these threads will work together on these tasks.

The rotation distributes the message

In this case we’re going to start two worker threads one message sending thread and see how it works between two worker threads.

Encapsulated utility class

This duplication of code should be isolated. As long as it’s easy to call

public class RabbitmqUtil {
    public static Channel getChannel (a) throws Exception {
        // Create a linked object
        ConnectionFactory factory = new ConnectionFactory();
        // Set the queue for the factory
        factory.setHost("localhost");
        / / user name
        factory.setUsername("guest");
        / / password
        factory.setPassword("guest");
        // Create a link
        Connection connection = factory.newConnection();
        // Receive the message
        Channel channel = connection.createChannel();
        returnchannel; }}Copy the code

Start two working threads

Consumer code

// This is the first worker thread
public class Consumer01 {
    // Prepare the name of the queue to receive
    public static final String QUEUE_NAME = "fjj";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();
        // Receive the message
        // Parameter 1 consumes that queue
        // Parameter 2 should be automatically processed after successful consumption
        // Parameter 3 a callback where the consumer did not successfully consume
        // Parameter 4 the consumer cancels the consumer callback
        // Declare to receive the message
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("message = " +new String( message.getBody()));
        };
        // Cancel the callback of the message
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("Message break");
        };
        System.out.println("C1 waiting thread !!!!!!!!!");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); }}Copy the code

Another consumer code just like this one will do

Create the producer’s code


public class Producer01 {
    // Specify the queue name
    public static final String QUEUE_NAME = "fjj";
// Send a large number of messages
public static void main(String[] args) throws Exception {
    // Create a link
    Channel channel = RabbitmqUtil.getChannel();
    channel.queueDeclare(QUEUE_NAME,false.false.false.null);
    // Send a message
    Scanner scanner = new Scanner(System.in);
    while (scanner.hasNext()) {
        String msg = scanner.next();
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        System.out.println("Producer over"); }}}Copy the code

Work queue results

You poll me one by one

Message response

It may take a while for a consumer to complete a task, and if one of the consumers is working on a long task and only partially completed it suddenly dies, What happens when RabbitMq sends a message to a consumer and immediately removes that message flag in which case a consumer suddenly hangs up and we lose data and all subsequent data sent to that consumer disappears

To ensure that messages are not lost in the process of being sent RabbitMq has introduced a message reply mechanism which means that the consumer has received the message and processed it. RabbitMq has processed it and can delete the message

Automatic reply

This mode requires a trade-off between high throughput and data transmission security because in this mode, if a link or Cannel closes on the consumer side before the message is received, the message is lost. On the other hand, of course, the consumer side may be overloaded with messages. There is no limit on the number of messages delivered and it is also possible that consumers receive too many messages to process. This causes message backlogs and eventually runs out of memory. Eventually these consumer threads are killed by the operating system so this default is appropriate for situations where consumers can process these messages efficiently and at some rate.

Manual response

  1. Cannel.basicack (for affirmative confirmation) RabbitMq knows the message and successfully processed it, and can discard it
  2. Cannel.basicnack (used to deny confirmation)
  3. Cannel.basicReject has one less parameter than cannel. basicNack to reject the message and discard it.

Messages are automatically requeued

If the consumer loses the connection for some reason (the channel is closed the connection is closed or the Tcp connection is lost), ACK confirmation RabbitMq will know that the message is not fully processed and will requeue it if it can be processed by another consumer at this point and will redistribute it to another consumer so that even if a consumer dies occasionally no messages will be lost

Message manual reply code

Producer code

public class Producer02 {
    // Set the queue name
    public final static String QUEUE_NAME ="ack_Hello";

    public static void main(String[] args) throws Exception {
        // Get the channel
        Channel channel = RabbitmqUtil.getChannel();
        // Set send
        channel.queueDeclare(QUEUE_NAME,false.false.false.null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String msg = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes("UTF-8"));
            System.out.println("Producer over"); }}}Copy the code

Consumer03

public class Consumer03 {
    public final static String QUEUE_NAME ="ack_Hello";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            / / into a deep sleep
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("C3 received message =" +new String( message.getBody()));
            // 1 Message tag
            // 2 Specifies whether to reply in batches
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        // Cancel the callback of the message
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("Message break");
        };
        boolean authAck = false; channel.basicConsume(QUEUE_NAME, authAck, deliverCallback, cancelCallback); }}Copy the code

The other consumer is just like this one except for a different amount of sleep

Message response result

Normally the sender of a message sends two messages C1 and C2 each receive the message and process it but at the point of sending a message when C2 consumer suddenly dies in the sense of polling the message is going to be C2 and when C2 dies the message will come back Message queues are re-enqueued and then allocated to C1 consumers for consumption.

The RabbitMq persistence

concept

You have just seen how to handle this without losing a task but how to ensure that messages sent by a message sender are not lost when the RabbitMq service is down. By default RabbitMq exits or crashes for some reason, it ignores queues and messages unless it is told not to do so. To ensure that messages are not lost, two things need to be done: We need to mark both the queue and the message as persistent.

How do queues persist

The queues we created are non-persistent and will be removed if rabbitMQ restarts. If you want to persist a queue, you need to set burable to persist when you declare the queue

However, it is important to note that if the previously declared queue is not persistent, you need to delete the original queue or create a new persistent queue otherwise an error will occur.

The message is persistent

Messages to make persistent need producers to modify the code, MessageProperties. PERSISTENT_TEXT_PLAIN add this attribute.

Marking messages as persistent is not a complete guarantee against message loss although he tells RabbitMq to save the message to disk, it is still there when the message is ready to be stored on disk, But the message is still in the cache at a break point where it’s not actually written to disk at this point and the persistence guarantee is not strong but it’s more than enough for our simple task queue.