This topic describes the common positions used by RabbitMQ.

Basic posture

Common code encapsulation

Encapsulation factory class:

public class RabbitUtil {
    public static ConnectionFactory getConnectionFactory(a) {
        // Create a connection project. The default case is given below
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");
        returnfactory; }}Copy the code

Package generator:

public class MsgProducer {
    public static void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message) throws IOException, TimeoutException {
        ConnectionFactory factory = RabbitUtil.getConnectionFactory();
        // Create a connection
        Connection connection = factory.newConnection();
        // Create a message channel
        Channel channel = connection.createChannel();
        // Declare messages in Exchange to be persistent and not automatically deleted
        channel.exchangeDeclare(exchange, exchangeType, true.false.null);
        // Publish the message
        channel.basicPublish(exchange, toutingKey, null, message.getBytes());
        System.out.println("Sent '" + message + "'"); channel.close(); connection.close(); }}Copy the code

Encapsulate consumers:

public class MsgConsumer {
    public static void consumerMsg(String exchange, String queue, String routingKey)
            throws IOException, TimeoutException {
        ConnectionFactory factory = RabbitUtil.getConnectionFactory();
        // Create a connection
        Connection connection = factory.newConnection();
        // Create a message channel
        final Channel channel = connection.createChannel();
        // Message queue
        channel.queueDeclare(queue, true.false.false.null);
        // Bind the queue to the switch
        channel.queueBind(queue, exchange, routingKey);
        System.out.println("[*] Waiting for message. To exist press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                try {
                    System.out.println(" [x] Received '" + message);
                } finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(envelope.getDeliveryTag(), false); }}};// Cancel automatic ACK
        channel.basicConsume(queue, false, consumer); }}Copy the code

Direct way

Direct the sample

Producers:

public class DirectConsumer {
    private static final String exchangeName = "direct.exchange";
    public void msgConsumer(String queueName, String routingKey) {
        try {
            MsgConsumer.consumerMsg(exchangeName, queueName, routingKey);
        } catch (IOException e) {
            e.printStackTrace();
        } catch(TimeoutException e) { e.printStackTrace(); }}public static void main(String[] args) throws InterruptedException {
        DirectConsumer consumer = new DirectConsumer();
        String[] routingKey = new String[]{"aaa"."bbb"."ccc"};
        String[] queueNames = new String[]{"qa"."qb"."qc"};

        for (int i = 0; i < 3; i++) {
            consumer.msgConsumer(queueNames[i], routingKey[i]);
        }
        Thread.sleep(1000 * 60 * 100); }}Copy the code

The execution producer puts 10 messages into the message queue, where the key is “aaa”, “BBB” and “CCC”, and puts them into qa, QB and QC queues respectively:

Here is the qa queue information:

Consumer:

public class DirectProducer {
    private static final String EXCHANGE_NAME = "direct.exchange";
    public void publishMsg(String routingKey, String msg) {
        try {
            MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, routingKey, msg);
        } catch(Exception e) { e.printStackTrace(); }}public static void main(String[] args) throws InterruptedException {
        DirectProducer directProducer = new DirectProducer();
        String[] routingKey = new String[]{"aaa"."bbb"."ccc"};
        String msg = "hello >>> ";
        for (int i = 0; i < 10; i++) {
            directProducer.publishMsg(routingKey[i % 3], msg + i);
        }
        System.out.println("----over-------");
        Thread.sleep(1000 * 60 * 100); }}Copy the code

Output after execution:

[*] Waiting for message. To exist press CTRL+C [x] Received 'hello >>> 0 [x] Done [x] Received 'hello >>> 3 [x] Done [x]  Received 'hello >>> 6 [x] Done [x] Received 'hello >>> 9 [x] Done [*] Waiting for message. To exist press CTRL+C [x] Received 'hello >>> 1 [x] Done [x] Received 'hello >>> 4 [x] Done [x] Received 'hello >>> 7 [x] Done [*] Waiting for message. To exist press CTRL+C [x] Received 'hello >>> 2 [x] Done [x] Received 'hello >>> 5 [x] Done [x] Received 'hello  >>> 8 [x] DoneCopy the code

As you can see, the data of different keys are consumed from QA, QB and QC respectively.

Problems discussed

Is the queue name qa, QB and QC automatically generated by RabbitMQ or can we specify the queue name?

I did a simple experiment. I changed the consumer code:

public static void main(String[] args) throws InterruptedException {
    DirectConsumer consumer = new DirectConsumer();
    String[] routingKey = new String[]{"aaa"."bbb"."ccc"};
    String[] queueNames = new String[]{"qa"."qb"."qc1"}; // Change qc to Qc1

    for (int i = 0; i < 3; i++) {
        consumer.msgConsumer(queueNames[i], routingKey[i]);
    }
    Thread.sleep(1000 * 60 * 100);
}
Copy the code

After execution, it is shown as the following figure:

When executed by consumers, the queues specified by consumers are bound to direct. Exchange based on key. When the queues specified by consumers are bound to direct.

When we consume all the data in the queue and re-execute the generator, we will find that there are 3 pieces of data to be consumed in both QC and QC1, because the bound key is “CCC”, so the data in both are the same:

The binding relationship is as follows:

Note: When no Queue is bound to An Exchange, messages written to an Exchange are not redistributed to later bound queues.

Consider: Without implementing the consumer, I can’t see the information in Queres. I can actually think of this interface as the consumer information interface. Queues are used for storing data in RabbitMQ. Queues are used for storing data in RabbitMQ. They are used for storing data in Queues.

Fanout mode (specify queue)

Producer packaging:

public class FanoutProducer {
    private static final String EXCHANGE_NAME = "fanout.exchange";
    public void publishMsg(String routingKey, String msg) {
        try {
            MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg);
        } catch(Exception e) { e.printStackTrace(); }}public static void main(String[] args) {
        FanoutProducer directProducer = new FanoutProducer();
        String msg = "hello >>> ";
        for (int i = 0; i < 10; i++) {
            directProducer.publishMsg("", msg + i); }}}Copy the code

Consumer:

public class FanoutConsumer {
    private static final String EXCHANGE_NAME = "fanout.exchange";
    public void msgConsumer(String queueName, String routingKey) {
        try {
            MsgConsumer.consumerMsg(EXCHANGE_NAME, queueName, routingKey);
        } catch (IOException e) {
            e.printStackTrace();
        } catch(TimeoutException e) { e.printStackTrace(); }}public static void main(String[] args) {
        FanoutConsumer consumer = new FanoutConsumer();
        String[] queueNames = new String[]{"qa-2"."qb-2"."qc-2"};
        for (int i = 0; i < 3; i++) {
            consumer.msgConsumer(queueNames[i], ""); }}}Copy the code

Executing the generator results in the following:

We found that the 10 pieces of data produced by the producer can be consumed by each consumer. This is different from Direct, but there are a few points to note when using the Fanout method:

  • A producer’s routKey can be empty because all of the producer’s data is placed on each queue, so it is not routed through a routKey.
  • The consumer needs to specify queues because the consumer needs to be bound to the specified queues in order to consume.

This graph illustrates the essence of Fanout, where an Exchange is bound to all queues without any distinction between routes, and consumers need to be bound to a specified queue to initiate consumption.

Note: You may not see an increase in the number of messages when you are putting data into the queue. It is possible that you have already started the consumption process, causing the increased messages to be consumed immediately.

Fanout (random queue)

For example, for Fanout, I don’t really need to care about the name of the queue. If I specify a corresponding queue for consumption, it feels redundant, so we use the random method of obtaining the queue name, and the following code directly Copy the official website.

Generator encapsulation:

public static void publishMsgV2(String exchange, BuiltinExchangeType exchangeType, String message) throws IOException, TimeoutException {
    ConnectionFactory factory = RabbitUtil.getConnectionFactory();
    // Create a connection
    Connection connection = factory.newConnection();
    // Create a message channel
    Channel channel = connection.createChannel();

    // Declare messages in Exchange
    channel.exchangeDeclare(exchange, exchangeType);

    // Publish the message
    channel.basicPublish(exchange, "".null, message.getBytes("UTF-8"));

    System.out.println("Sent '" + message + "'");
    channel.close();
    connection.close();
}
Copy the code

Consumer packaging:

public static void consumerMsgV2(String exchange) throws IOException, TimeoutException {
    ConnectionFactory factory = RabbitUtil.getConnectionFactory();
    Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.exchangeDeclare(exchange, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, exchange, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
Copy the code

Producers:

public class FanoutProducer { private static final String EXCHANGE_NAME = "fanout.exchange.v2"; public void publishMsg(String msg) { try { MsgProducer.publishMsgV2(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, msg); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { FanoutProducer directProducer = new FanoutProducer(); String msg = "hello >>> "; for (int i = 0; i < 10000; i++) { directProducer.publishMsg(msg + i); }}}Copy the code

Consumer:

public class FanoutConsumer {
    private static final String EXCHANGE_NAME = "fanout.exchange.v2";
    public void msgConsumer(a) {
        try {
            MsgConsumer.consumerMsgV2(EXCHANGE_NAME);
        } catch (IOException e) {
            e.printStackTrace();
        } catch(TimeoutException e) { e.printStackTrace(); }}public static void main(String[] args) {
        FanoutConsumer consumer = new FanoutConsumer();
        for (int i = 0; i < 3; i++) { consumer.msgConsumer(); }}}Copy the code

After the command is executed, the management page is as follows:

Topic way

The code can be found in the website: www.rabbitmq.com/tutorials/t…

For more, please directly to check the website: www.rabbitmq.com/getstarted….

The RabbitMQ advanced

Refer to the article: liuyueyi. Making. IO/hexblog / 201…

Durable and autoDeleted

When defining a Queue, you can specify these two parameters:

/**
 * Declare an exchange.
 * @see com.rabbitmq.client.AMQP.Exchange.Declare
 * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
 * @param exchange the name of the exchange
 * @param type the exchange type
 * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
 * @param autoDelete true if the server should delete the exchange when it is no longer in use
 * @param arguments other properties (construction arguments) for the exchange
 * @return a declaration-confirm method to indicate the exchange was successfully declared
 * @throws java.io.IOException if an error is encountered
 */
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete,
    Map<String, Object> arguments) throws IOException;
    
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
    Map<String, Object> arguments) throws IOException;
Copy the code

durable

To ensure that data is not lost in the event of a RabbitMQ exit or crash, queue, Exchange, and Message should be persisted.

If the durable identifier of the queue is set to true, it is a durable queue. After the service restarts, the durable queue will be read again.

Queues can be persisted, but whether the messages inside them are persisted depends on the persistence Settings of the messages. If a message has not been sent from the queue, does the message still exist in the queue? This depends on the Settings of the message.

autoDeleted

Automatic deletion, if the queue does not have any subscribed consumers, the queue will be automatically deleted. This queue is suitable for temporary queues.

When a Queue is set to delete automatically, the Queue will be deleted if a customer breaks down. This is for non-important data and does not want messages to accumulate.

section

  • After a Queue is declared, it will not be able to update the durable or autoDelted value. If you want to modify the value, delete it first and then restate it
  • The declared Queue should be consistent with the declared properties of the durable and autoDelted Queue. Otherwise, an error will be reported
  • For important data, it is declared as durable=true and autoDeleted=false
  • For queues with autoDeleted=true, the queue is automatically deleted when there are no consumers

ACK

It may take a few seconds to perform a task, and you might worry if a consumer dies in the process. Once RabbitMQ has distributed the message to the consumer, it is removed from memory. In this case, if a consumer that is performing a task goes down, the messages that are being processed and the messages that were distributed to that consumer but have not been processed are lost.

However, we don’t want to lose any tasks, and if one consumer dies, we should hand over the tasks assigned to it to another consumer.

To ensure that messages are not lost, RabbitMQ supports message responses. The consumer sends a reply message telling RabbitMQ that the message has been received and processed. RabbitMQ can remove it.

Therefore, the common means of manual ACK:

// 接收消息之后,主动ack/nak
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
            byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        try {
            System.out.println(" [ " + queue + " ] Received '" + message);
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        }
    }
};
// 取消自动ack
channel.basicConsume(queue, false, consumer);
Copy the code

Welcome to more like, more articles, please pay attention to the wechat public number “Lou Zai advanced road”, point to pay attention, don’t get lost ~~