This is the ninth day of my participation in the August More text Challenge. For details, see: August More Text Challenge
Related articles
RabbitMQ series: RabbitMQ series
preface
- We have just seen how to handle tasks without losing them, but how to ensure that messages sent by producers are not lost when the RabbitMQ service is down.
- By default, when RabbitMQ exits or crashes for some reason, it ignores queues and messages unless it is told not to.
- Two things need to be done to ensure that messages are not lost: we need to mark both the queue and the message as persistent.
1. Queue persistence
-
Queues are not durable. If rabbitMQ restarts, the queues will be deleted. The durable queues need to be declared as durable
-
Let’s restart RabbitMQ
-
The management page is refreshed
-
Ah, we lost our queue
-
Obviously, in a production environment, it is impossible to guarantee that the server will never restart or power down. So it’s easy to lose data if you don’t persist the queue.
-
Persisting data is something we have to do.
-
However, it is important to note that if the previously declared queue is not persistent, the original queue needs to be deleted or a new persistent queue needs to be created, otherwise an error will occur
-
The source code is as follows
-
public com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException { validateQueueNameLength(queue); return (com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk)this.exnWrappingRpc((newcom.rabbitmq.client.AMQP.Queue.Declare.Builder()).queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDele te).arguments(arguments).build()).getMethod(); }Copy the code
-
Changing the producer code
-
/** * This is a test producer *@author DingYongJun *@date2021/8/1 * / public class DyProducerTest_xiaoxiyingda { /** * For convenience, we use the main function to test *@param args */ public static void main(String[] args) throws Exception{ // Use the utility class to create the channel Channel channel = RabbitMqUtils.getChannel(); /** * generate a queue * 1. Queue name * 2. Whether messages in the queue are persistent * 3. Is the queue only for one consumer to consume is it shared true Multiple consumers can consume * 4. Indicates whether the queue is automatically deleted after the last consumer disconnect. True Indicates whether the queue is automatically deleted * 5. Other parameters */ // This is the persistent parameter. False does not persist, true does boolean durable = true; channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,durable,false.false.null); /** * send a message * 1. Send to the switch * 2. What is the key of the route * 3. Other parameter information * 4. Body of the sent message */ Scanner sc = new Scanner(System.in); System.out.println("Please enter information"); while (sc.hasNext()) { String message = sc.nextLine(); channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,null,message.getBytes()); System.out.println("The producer sends the message"+ message); }}}Copy the code
-
As stated above, the existing queue must be deleted or an error will be reported.
-
Detailed error information
-
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'xiaoxiyingda' in vhost '/': received 'true' but current is 'false'.class-id=50, method-id=10) Copy the code
-
-
Okay, delete the original queue, and do it again.
-
The D letter appears in Features, which proves that queue persistence succeeded!
-
Try restarting RabbitMQ again. Save space, don’t stick pictures.
-
Perfect! The queue is still there! Nice!
Message persistence
-
Modifying the producer code
-
/** * This is a test producer *@author DingYongJun *@date2021/8/1 * / public class DyProducerTest_xiaoxiyingda { /** * For convenience, we use the main function to test *@param args */ public static void main(String[] args) throws Exception{ // Use the utility class to create the channel Channel channel = RabbitMqUtils.getChannel(); /** * generate a queue * 1. Queue name * 2. Whether messages in the queue are persistent * 3. Is the queue only for one consumer to consume is it shared true Multiple consumers can consume * 4. Indicates whether the queue is automatically deleted after the last consumer disconnect. True Indicates whether the queue is automatically deleted * 5. Other parameters */ // This is the persistent parameter. False does not persist, true does boolean durable = true; channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,durable,false.false.null); /** * send a message * 1. Send to the switch * 2. What is the key of the route * 3. Other parameter information * 4. Body of the sent message */ Scanner sc = new Scanner(System.in); System.out.println("Please enter information"); while (sc.hasNext()) { String message = sc.nextLine(); //MessageProperties.PERSISTENT_TEXT_PLAIN; This represents message persistence to disk channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); System.out.println("The producer sends the message"+ message); }}}Copy the code
-
MessageProperties.PERSISTENT_TEXT_PLAIN
-
Look at the source code, what is this east east?
-
public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException { this.basicPublish(exchange, routingKey, false, props, body); } Copy the code
-
public class MessageProperties { public static final BasicProperties MINIMAL_BASIC = new BasicProperties((String)null, (String)null, (Map)null, (Integer)null, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null); public static final BasicProperties MINIMAL_PERSISTENT_BASIC = new BasicProperties((String)null, (String)null, (Map)null.2, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null); public static final BasicProperties BASIC = new BasicProperties("application/octet-stream", (String)null, (Map)null.1.0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null); public static final BasicProperties PERSISTENT_BASIC = new BasicProperties("application/octet-stream", (String)null, (Map)null.2.0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null); public static final BasicProperties TEXT_PLAIN = new BasicProperties("text/plain", (String)null, (Map)null.1.0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null); public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", (String)null, (Map)null.2.0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null); public MessageProperties(a) {}}Copy the code
-
When the message is sent we need to write the parameter, and mq will persist it to disk.
-
-
I see no ending, but I will search high and low
If you think I blogger writes good! Writing is not easy, please like, follow, comment to encourage the blogger ~hahah