This is the ninth day of my participation in the August More text Challenge. For details, see: August More Text Challenge


Related articles

RabbitMQ series: RabbitMQ series


preface

  • We have just seen how to handle tasks without losing them, but how to ensure that messages sent by producers are not lost when the RabbitMQ service is down.
  • By default, when RabbitMQ exits or crashes for some reason, it ignores queues and messages unless it is told not to.
  • Two things need to be done to ensure that messages are not lost: we need to mark both the queue and the message as persistent.

1. Queue persistence

  • Queues are not durable. If rabbitMQ restarts, the queues will be deleted. The durable queues need to be declared as durable

  • Let’s restart RabbitMQ

  • The management page is refreshed

  • Ah, we lost our queue

  • Obviously, in a production environment, it is impossible to guarantee that the server will never restart or power down. So it’s easy to lose data if you don’t persist the queue.

  • Persisting data is something we have to do.

  • However, it is important to note that if the previously declared queue is not persistent, the original queue needs to be deleted or a new persistent queue needs to be created, otherwise an error will occur

  • The source code is as follows

  •     public com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
            validateQueueNameLength(queue);
            return (com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk)this.exnWrappingRpc((newcom.rabbitmq.client.AMQP.Queue.Declare.Builder()).queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDele te).arguments(arguments).build()).getMethod(); }Copy the code
  • Changing the producer code

  • /** * This is a test producer *@author DingYongJun
     *@date2021/8/1 * /
    public class DyProducerTest_xiaoxiyingda {
        /** * For convenience, we use the main function to test *@param args
         */
        public static void main(String[] args) throws Exception{
            // Use the utility class to create the channel
            Channel channel = RabbitMqUtils.getChannel();
    
            /** * generate a queue * 1. Queue name * 2. Whether messages in the queue are persistent * 3. Is the queue only for one consumer to consume is it shared true Multiple consumers can consume * 4. Indicates whether the queue is automatically deleted after the last consumer disconnect. True Indicates whether the queue is automatically deleted * 5. Other parameters */
            // This is the persistent parameter. False does not persist, true does
            boolean durable = true;
            channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,durable,false.false.null);
    
            /** * send a message * 1. Send to the switch * 2. What is the key of the route * 3. Other parameter information * 4. Body of the sent message */
            Scanner sc = new Scanner(System.in);
            System.out.println("Please enter information");
            while (sc.hasNext()) {
                String message = sc.nextLine();
                channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,null,message.getBytes());
                System.out.println("The producer sends the message"+ message); }}}Copy the code
  • As stated above, the existing queue must be deleted or an error will be reported.

  • Detailed error information

    • Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'xiaoxiyingda' in vhost '/': received 'true' but current is 'false'.class-id=50, method-id=10)
      Copy the code
  • Okay, delete the original queue, and do it again.

  • The D letter appears in Features, which proves that queue persistence succeeded!

  • Try restarting RabbitMQ again. Save space, don’t stick pictures.

  • Perfect! The queue is still there! Nice!

Message persistence

  • Modifying the producer code

    • /** * This is a test producer *@author DingYongJun
       *@date2021/8/1 * /
      public class DyProducerTest_xiaoxiyingda {
          /** * For convenience, we use the main function to test *@param args
           */
          public static void main(String[] args) throws Exception{
              // Use the utility class to create the channel
              Channel channel = RabbitMqUtils.getChannel();
      
              /** * generate a queue * 1. Queue name * 2. Whether messages in the queue are persistent * 3. Is the queue only for one consumer to consume is it shared true Multiple consumers can consume * 4. Indicates whether the queue is automatically deleted after the last consumer disconnect. True Indicates whether the queue is automatically deleted * 5. Other parameters */
              // This is the persistent parameter. False does not persist, true does
              boolean durable = true;
              channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,durable,false.false.null);
      
              /** * send a message * 1. Send to the switch * 2. What is the key of the route * 3. Other parameter information * 4. Body of the sent message */
              Scanner sc = new Scanner(System.in);
              System.out.println("Please enter information");
              while (sc.hasNext()) {
                  String message = sc.nextLine();
                  //MessageProperties.PERSISTENT_TEXT_PLAIN; This represents message persistence to disk
                  channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
                  System.out.println("The producer sends the message"+ message); }}}Copy the code
    • MessageProperties.PERSISTENT_TEXT_PLAIN

      • Look at the source code, what is this east east?

      •     public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
                this.basicPublish(exchange, routingKey, false, props, body);
            }
        Copy the code
      • public class MessageProperties {
            public static final BasicProperties MINIMAL_BASIC = new BasicProperties((String)null, (String)null, (Map)null, (Integer)null, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
            public static final BasicProperties MINIMAL_PERSISTENT_BASIC = new BasicProperties((String)null, (String)null, (Map)null.2, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
            public static final BasicProperties BASIC = new BasicProperties("application/octet-stream", (String)null, (Map)null.1.0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
            public static final BasicProperties PERSISTENT_BASIC = new BasicProperties("application/octet-stream", (String)null, (Map)null.2.0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
            public static final BasicProperties TEXT_PLAIN = new BasicProperties("text/plain", (String)null, (Map)null.1.0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
            public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", (String)null, (Map)null.2.0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
        
            public MessageProperties(a) {}}Copy the code
      • When the message is sent we need to write the parameter, and mq will persist it to disk.


I see no ending, but I will search high and low

If you think I blogger writes good! Writing is not easy, please like, follow, comment to encourage the blogger ~hahah