Basic introduction

RabbitMQ is open source message broker software (also known as message-oriented middleware) that implements the Advanced Message Queuing Protocol (AMQP). AMQP (Advanced Message Queuing Protocol) is an open standard of application-layer protocols designed for message-oriented middleware. Message-oriented middleware is mainly used for decoupling between components so that the sender of a message does not need to be aware of the existence of a message consumer.

What problems can be solved

Asynchronous processing Applications Decouple traffic peak elimination log processing…

What similar products are commonly used

Rocket, activeMQ, Kafka

Environment Installation and Configuration

Install the tutorial

This section describes the client interface

Add a user

Virtual hosts is equivalent to the dB in mysql. Authorized users can access the Virtual hosts

Authorize a user

Five kinds of queue

Simple queue

  • P is the producer of the message
  • The red ones are message queues
  • C stands for consumer

A consumer in a simple queue binds to a queue: the producer sends a message to the queue, and the consumer retrieves the message from the queue

Create a Maven project

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>3.4.1 track</version>
</dependency>
Copy the code

The connection information

    public static Connection getConn(a) throws IOException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("root");
        factory.setPassword("root");
        Connection conn = factory.newConnection();
        return conn;
    }
Copy the code

producers

    public final static String QUEUE_NAME = "q_test_01";
    public static void main(String[] args) throws IOException {
        Connection conn = ConnUtil.getConn();
        Channel channel = conn.createChannel();// Create channels from the connection
        // Declare a queue
        channel.queueDeclare(QUEUE_NAME,false.false.false.null);
        // Message content definition
        String message = "HelloWorld2!!!";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("message" + message);
        // Close channels and connections
        channel.close();
        conn.close();
    }
Copy the code

consumers

    public static void main(String[] args) throws IOException, InterruptedException {
        Connection conn = ConnUtil.getConn();
        Channel channel = conn.createChannel();
        channel.queueDeclare(Provider.QUEUE_NAME,false.false.false.null);
        // Define queue consumers
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // Listen to the queue
        channel.basicConsume(Provider.QUEUE_NAME, true, consumer);
        // Get the message
        // Get the message
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println(" [x] Received '" + msg + "'"); }}Copy the code

Starting producers and consumers can see that messages sent by producers are consumed by consumers and printed on the console. Disadvantages: high coupling, producer-to-consumer, not if you want multiple consumers to consume messages in the queue

The work queue

The producer sends 100 messages

        Connection conn = ConnUtil.getConn();
        Channel channel = conn.createChannel();
        // Declare a queue
        channel.queueDeclare(WorkConsumer1.QUEUE_NAME, false.false.false.null);
        for (int i = 0; i < 100; i++) {
            // Message content
            String message = "" + i;
            channel.basicPublish("", WorkConsumer1.QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            Thread.sleep( 10);
        }
        channel.close();
        conn.close();
Copy the code

Consumer 1

    public final static String QUEUE_NAME = "test_queue_work4";
    public static void main(String[] args) throws IOException, InterruptedException {
        Connection conn = ConnUtil.getConn();
        Channel channel = conn.createChannel();
        channel.queueDeclare(QUEUE_NAME,false.false.false.null);
        channel.basicQos(1);// The server sends only one message to the consumer at a time
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // Listen on the queue true: automatic false: manual return status
        channel.basicConsume(QUEUE_NAME, true, consumer);
        while (true) {// Get the message
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println(" [1] Received '" + msg + "'");
            //Thread.sleep(10);
            // Return confirm status, comment out to indicate that automatic confirm mode is used
           // channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}Copy the code

Consumer 2

    public static void main(String[] args) throws IOException, InterruptedException {
        Connection conn = ConnUtil.getConn();
        Channel channel = conn.createChannel();
        channel.queueDeclare(QUEUE_NAME,false.false.false.null);
        channel.basicQos(1);// The server sends only one message to the consumer at a time
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // Listen on the queue true: automatic false: manual return status
        channel.basicConsume(QUEUE_NAME, true, consumer);
        while (true) {// Get the message
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println(" [2] Received '" + msg + "'");
            Thread.sleep(1000);
            // Return confirm status, comment out to indicate that automatic confirm mode is used
           //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}Copy the code

You can then test the results: even though two consumers process messages at different times, they end up consuming the same amount. One consumer consumes an odd number of messages and the other an even number. This default approach, known as polling distribution, has the advantage of working easily in parallel. If we have a backlog of work, we can solve this problem by adding workers (consumers), making the system easier to scale. By default RabbitMQ will send messages one at a time to the next consumer in the sequence (regardless of the duration of each task, etc., and allocated once in advance rather than one at a time). The average consumer gets the same number of messages.

But this is obviously not reasonable in practice because the consumer 1 thread pauses for a short time. It should be that consumer 1 has more information than consumer 2. That is, consumers who are good at processing messages should consume more messages, and fair distribution can be used to solve this problem.

The response determination mechanism for messages needs to be changed

  1. Automatic acknowledgement Once the message is fetched from the queue, the consumer is considered to have successfully consumed the message regardless of whether it has been successfully consumed after being fetched
  2. After manually confirming that the consumer has received a message from the queue, the server marks the message as unavailable and waits for feedback from the consumer, which will remain unavailable if it does not.

Modify the above code

// The server sends only one message to the consumer at a time
channel.basicQos(1);
// Listen on the queue, false for manual completion, true for automatic, autoAck = false also avoids message loss, if the consumer processing the message hangs, the message will be delivered to another consumer
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
// Open this line after successful message consumption to use manual confirmation mode
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
Copy the code

At this point restart the execution can see the effect of the “master of the trade”

Message persistence

Boolan durable = false;
channel.queueDeclare(QUEUE_NAME, durable, false.false.null);
Copy the code

Setting persistence when declaring queues ensures that restart messages persist after MQ hangs

Publish subscribe mode publish_subscribe

  1. One producer, many consumers
  2. Each consumer has its own queue
  3. Instead of sending a message directly to a message queue, the producer sends it to a forwarder exchange
  4. Each queue is bound to the forwarder
  5. A message sent by a producer can be consumed by multiple consumers as it passes through the switch to the queue

producers

    public final static String EXCHANGE_NAME = "test_exchange_fanout";
    public static void main(String[] argv) throws Exception {
        // Get the connection and mq channel
        Connection connection = ConnUtil.getConn();
        Channel channel = connection.createChannel();
        // Declare an Exchange switch
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // Message content
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "".null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        channel.close();
        connection.close();
    }
Copy the code

The test can see the result: a single consumption can be consumed by multiple consumers, only those who subscribe before the producer sends the message can consume the message, otherwise they can see the queue and switch binding in the client tool

Routing mode

Processing routing keys

Each queue is bound to a switch with a specified routingKey [routingKey]. At this time, messages sent by the switch containing these routingkeys can be sent to these queues, and consumers can successfully consume them.

The provider

    public final static String EXCHANGE_NAME = "test_exchange_direct";
    public static void main(String[] argv) throws Exception {
        // Get the connection and mq channel
        Connection connection = ConnUtil.getConn();
        Channel channel = connection.createChannel();
        / / declare the exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");// Specify the switch type
        // Message content
        String message = "Hello World!";
        String routingKey = "delete";
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        channel.close();
        connection.close();
    }
Copy the code

Consumer 1

    private final static String QUEUE_NAME = "queue_direct_1";
    private final static String EXCHANGE_NAME = "test_exchange_direct";
    public static void main(String[] argv) throws Exception {
        Connection connection = ConnUtil.getConn();
        Channel channel = connection.createChannel();
        // Declare a queue
        channel.queueDeclare(QUEUE_NAME, false.false.false.null);
        // Bind queues to switches
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
        // The server sends only one message to the consumer at a time
        channel.basicQos(1);
        // Define the consumers of the queue
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // Listen on the queue and return done manually
        channel.basicConsume(QUEUE_NAME, false, consumer);
        // Get the message
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv1] Received '" + message + "'");
            Thread.sleep(10);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }}Copy the code

Consumer 2

    private final static String QUEUE_NAME = "queue_direct_2";
    private final static String EXCHANGE_NAME = "test_exchange_direct";
    public static void main(String[] argv) throws Exception {
        Connection connection = ConnUtil.getConn();
        Channel channel = connection.createChannel();
        // Declare a queue
        channel.queueDeclare(QUEUE_NAME, false.false.false.null);
        // Bind queues to switches
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
        // The server sends only one message to the consumer at a time
        channel.basicQos(1);
        // Define the consumers of the queue
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // Listen on the queue and return done manually
        channel.basicConsume(QUEUE_NAME, false, consumer);
        // Get the message
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv2] Received '" + message + "'");
            Thread.sleep(10);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }}Copy the code

As you can see from the above code, consumer 2 is bound to the switch and the insert and DELETE routing keys are bound. Consumer 1 is only bound to insert. When you send an INSERT, both consumers can consume it

Topic mode topic

Matches a routing key to a pattern

‘#’ : matches one or more ‘*’ : matches the same message to be retrieved by multiple consumers. A consumer queue can have multiple consumer instances, and only one of them will consume the message.

channel.exchangeDeclare(EXCHANGE_NAME, "topic");/ / classification
String routingKey = "goods.update";
channel.basicPublish(EXCHANGE_NAME,routingKey, null, msg.getBytes()); This only matches the added function channel.queuebind (QUEUE_NAME, EXCHANGE_NAME,"goods.add"); Channel.queuebind (QUEUE_NAME, EXCHANGE_NAME,"goods.#");
Copy the code

How to ensure the reliability of messages

The transaction

  1. Channel.txselect () declares to start transaction mode;
  2. Channel.txcomment () commits the transaction;
  3. Channel.txrollback () rolls back the transaction;
The provider
    public static final String QUEUE_NAME = "test_queue_tx";
    public static void main(String[] args) throws IOException {
        Connection conn = ConnUtil.getConn();
        Channel channel = conn.createChannel();
        channel.queueDeclare(QUEUE_NAME,false.false.false.null);
        String msg = "hello tx";
        try {
            channel.txSelect();
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            channel.txCommit();
        } catch (Exception e) {
            channel.txRollback();
            System.out.println("Transaction Rollback");
        } finally{ channel.close(); conn.close(); }}Copy the code
consumers
   public static void main(String[] args) throws Exception {
        Connection conn = ConnUtil.getConn();
        Channel channel = conn.createChannel();
        channel.queueDeclare(TxSend.QUEUE_NAME, false.false.false.null);
        // Define queue consumers
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // Listen to the queue
        channel.basicConsume(TxSend.QUEUE_NAME, true, consumer);
        // Get the message
        // Get the message
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println(" [tx] Received '" + msg + "'"); }}Copy the code

After testing, it can be concluded that if there is no exception when sending the message, the message can be sent to the queue normally, and the consumer is normal. If an exception is caused by adding an int I = 10/0 when sending the message, then the transaction will be rolled back

This approach can ensure the reliability of the message, but also reduce throughput

Confirm pattern

The producer sets the channel to Confirm mode. Once the channel enters Confirm mode, all messages published in the channel are assigned a unique ID, which identifies the message. Once the message is sent to the matching queue, the broker sends an acknowledgement to the producer. Include the id of the message, which allows the producer to know if the message reached the destination queue, if the queue is persistent, confirm that the message is sent after being written to disk, and finally send to the producer to indicate that the message has been processed. The main benefits of the asynchronous confirm mode are as follows:

  1. In normal mode, the waitForConfirms() method is called once for every message sent
  2. Batch mode is to send messages in batches and call the waitForConfirms() method to determine whether a batch of messages have been processed
  3. The ConfirmListener() callback provided by the asynchronous Channel object only contains deliveryTag. We need to maintain an unconfirm set of message numbers for each Channel. Publish every data, increment the element in the collection by 1, call the handleAck method every time, delete multiple=false or multiple=true from the unconfirm collection. From the point of view of program running efficiency, the unconfirm set had better adopt the ordered set SortedSet storage structure.

General:

    // The queue name
    private static final String QUEUE_NAME = "test_simple_confirm";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection conn = ConnUtils.getConn();
        // Get a channel from the connection
        Channel channel = conn.createChannel();
        // Create a queue declaration
        channel.queueDeclare(QUEUE_NAME, false.false.false.null);
        // The producer calls confirmSelect() to set channel to confirm mode
        channel.confirmSelect();
        // The message sent
        String msg = "hello test_simple_confirm";
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        // This can drastically reduce rabbitMQ throughput

        System.out.println("send msg:" + msg);

        if(! channel.waitForConfirms()) { System.out.println("send message failed");
        } else {
            System.out.println("send message ok");
        }
        / / off the flow

Copy the code

Batch:

BasicPublish () multiple times is enough to send a batch of messages
String msg = "hello test_simple_confirm batch";
for (int i = 0; i < 10; i++) {// Send messages in batches
    channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}
Copy the code

Asynchronous:

// The queue name
private static final String QUEUE_NAME = "test_simple_confirm3";

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    Connection conn = ConnUtils.getConn();
    // Get a channel from the connection
    Channel channel = conn.createChannel();
    // Create a queue declaration
    channel.queueDeclare(QUEUE_NAME, false.false.false.null);
    // The producer calls confirmSelect() to set channel to confirm mode
    channel.confirmSelect();

    SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

    channel.addConfirmListener(new ConfirmListener() {
        // No problem
        @Override
        public void handleAck(long l, boolean b) throws IOException {
            if (b) {
                System.out.println("handleAck true");
                confirmSet.headSet(l+1).clear();
            } else {
                System.out.println("handleAck false"); confirmSet.remove(l); }}// No problem
        @Override
        public void handleNack(long l, boolean b) throws IOException {
            if (b) {
                System.out.println("handleNack true");
                confirmSet.headSet(l+1).clear();
            } else {
                System.out.println("handleNack false"); confirmSet.remove(l); }}});// The message sent
    String msg = "hello test_simple_confirm batch async";

    while (true) {
        long seqNo = channel.getNextPublishSeqNo();
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); confirmSet.add(seqNo); }}Copy the code