This is the 8th day of my participation in the August More text Challenge. For details, see: August More Text Challenge
Related articles
RabbitMQ series: RabbitMQ series
preface
- Consider a few questions before you start answering messages
- It may take a while for a consumer to complete a task. What happens if one of the consumers is working on a long task and only partially completed and suddenly it dies?
- Once RabbitMQ has delivered a message to a consumer, it immediately flags the message for removal. In this case, suddenly a consumer dies, and we lose the message we’re working on. And subsequent messages sent to the consumer because it couldn’t receive them.
- To ensure that messages are not lost when sent, RabbitMQ introduces a message reply mechanism
- After the consumer receives the message and processes it, it tells RabbitMQ that it has been processed and that RabbitMQ can delete the message.
One, automatic answer
- Immediately after the message is sent, it is considered to have been delivered. This mode requires a balance between high throughput and data transfer security, because in this mode, if the connection or channel closes on the consumer before the message is received, the message is lost.
- , of course, on the other hand, this model shoppers can communicate the message of overload, not to limit the number of messages transmitted, of course, this could make consumers here because too many could not process the messages they receive, leading to the backlog of these messages, eventually making run out of memory and eventually these consumers thread was killed by the operating system.
- So this pattern can only be used if the consumer can process these messages efficiently and at some rate.
① Producers
-
/** * This is a test producer *@author DingYongJun *@date2021/8/1 * / public class DyProducerTest_xiaoxiyingda { /** * For convenience, we use the main function to test *@param args */ public static void main(String[] args) throws Exception{ // Use the utility class to create the channel Channel channel = RabbitMqUtils.getChannel(); /** * generate a queue * 1. Queue name * 2. Whether messages in the queue are persistent * 3. Is the queue only for one consumer to consume is it shared true Multiple consumers can consume * 4. Indicates whether the queue is automatically deleted after the last consumer disconnect. True Indicates whether the queue is automatically deleted * 5. Other parameters */ channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,false.false.false.null); /** * send a message * 1. Send to the switch * 2. What is the key of the route * 3. Other parameter information * 4. Body of the sent message */ for (int i=0; i<6; i++){ String message="I'm the producer, and I've got good news for you!"+i; Thread.sleep( 1000 ); channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,null,message.getBytes()); System.out.println("Message sent."); }}}Copy the code
②, consumers
-
/** * This is a test for consumers *@author DingYongJun *@date2021/8/1 * / public class DyConsumerTest_xiaoxiyingda01 { public static void main(String[] args) throws Exception{ // Use the utility class to create the channel Channel channel = RabbitMqUtils.getChannel(); System.out.println("This is Consumer A. I'm waiting for A message!"); DeliverCallback deliverCallback = (String var1, Delivery var2)->{ String message= new String(var2.getBody()); System.out.println(message); }; CancelCallback cancelCallback = (String var1)->{ System.out.println("Message consumption interrupted"); }; /** * Consumer message * 1. Which queue to consume * 2. Whether to automatically reply after the consumption is successful true means automatic reply false manual reply * 3. A pullback where consumers failed to spend */ Thread.sleep(1000); channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,true,deliverCallback,cancelCallback); }}Copy the code
(3), test,
-
The execution result
-
conclusion
-
public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { return this.basicConsume(queue, autoAck, "".this.consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback)); } Copy the code
-
When autoAck is true, the answer is automatic
-
When autoAck is false, it is a manual reply
-
-
Automatic reply is not recommended. In actual service scenarios, manual reply is usually used.
Two, manual response
-
producers
-
/** * This is a test producer *@author DingYongJun *@date2021/8/1 * / public class DyProducerTest_xiaoxiyingda { /** * For convenience, we use the main function to test *@param args */ public static void main(String[] args) throws Exception{ // Use the utility class to create the channel Channel channel = RabbitMqUtils.getChannel(); /** * generate a queue * 1. Queue name * 2. Whether messages in the queue are persistent * 3. Is the queue only for one consumer to consume is it shared true Multiple consumers can consume * 4. Indicates whether the queue is automatically deleted after the last consumer disconnect. True Indicates whether the queue is automatically deleted * 5. Other parameters */ channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,false.false.false.null); /** * send a message * 1. Send to the switch * 2. What is the key of the route * 3. Other parameter information * 4. Body of the sent message */ Scanner sc = new Scanner(System.in); System.out.println("Please enter information"); while (sc.hasNext()) { String message = sc.nextLine(); channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,null,message.getBytes()); System.out.println("The producer sends the message"+ message); }}}Copy the code
-
-
Consumers A
-
/** * This is a test for consumers *@author DingYongJun *@date2021/8/1 * / public class DyConsumerTest_xiaoxiyingda01 { public static void main(String[] args) throws Exception{ // Use the utility class to create the channel Channel channel = RabbitMqUtils.getChannel(); System.out.println("This is Consumer A. I'm waiting for A message!"); DeliverCallback deliverCallback = (String var1, Delivery var2)->{ String message= new String(var2.getBody()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(message); // True indicates the unanswered messages on the batch reply channel. False Indicates a single reply boolean multiple = false; channel.basicAck(var2.getEnvelope().getDeliveryTag(),multiple); }; CancelCallback cancelCallback = (String var1)->{ System.out.println("Message consumption interrupted"); }; /** * Consumer message * 1. Which queue to consume * 2. Whether to automatically reply after the consumption is successful true means automatic reply false manual reply * 3. A pullback where consumers failed to spend */ channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,false,deliverCallback,cancelCallback); }}Copy the code
-
Set manual answer and set the hibernation time to a short period, indicating that services are processed quickly.
-
-
Consumers B
-
/** * This is a test for consumers *@author DingYongJun *@date2021/8/1 * / public class DyConsumerTest_xiaoxiyingda02 { public static void main(String[] args) throws Exception{ // Use the utility class to create the channel Channel channel = RabbitMqUtils.getChannel(); System.out.println("This is Consumer B. I'm waiting for a message!"); DeliverCallback deliverCallback = (String var1, Delivery var2)->{ String message= new String(var2.getBody()); try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(message); // True indicates the unanswered messages on the batch reply channel. False Indicates a single reply boolean multiple = false; channel.basicAck(var2.getEnvelope().getDeliveryTag(),multiple); }; CancelCallback cancelCallback = (String var1)->{ System.out.println("Message consumption interrupted"); }; /** * Consumer message * 1. Which queue to consume * 2. Whether to automatically reply after the consumption is successful true means automatic reply false manual reply * 3. A pullback where consumers failed to spend */ channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,false,deliverCallback,cancelCallback); }}Copy the code
-
Set manual answer. If the hibernation duration is long, services are processed slowly.
-
-
Execution order
- 1. The console sends two messages respectively
- 2. Theoretically, each AB has a message. If THE connection is disconnected before B finishes processing, how to handle the message?
-
The execution result
Third, summary
- Multiple has different meanings for true and false
- Ture means batch response, on a channel basis, so if you do one, I’ll do all of them. Efficient, but unsafe.
- False means single reply, complete one I reply one. More safe and reliable. But it’s less efficient.
- Messages automatically rejoin the team
- If the consumer loses the connection for some reason (its channel is closed, the connection is closed or the TCP connection is lost) and the message is not sent with an ACK acknowledgement, RabbitMQ will know that the message is not fully processed and will requeue it.
- If another consumer can handle it at this point, it will quickly redistribute it to another consumer. In this way, even if a consumer occasionally dies, it can be assured that no messages are lost.
I see no ending, but I will search high and low
If you think I blogger writes good! Writing is not easy, please like, follow, comment to encourage the blogger ~hahah