“This is the 19th day of my participation in the First Challenge 2022. For details: First Challenge 2022”

Publishers of the Work message model publish messages

Wlork Queues, also known as Task Queues, are Task models. When message processing is time-consuming, messages may be produced faster than they are consumed. In the long run, messages pile up and can’t be processed in a timely manner. This is where you can use the Work model: multiple consumers are bound to a queue and collectively consume messages in the queue. Once the messages in the queue are consumed, they disappear, so the task is not repeated.

Work message model structure diagram

  • P: producer: the program to send the message
  • C1: Consumer 1, gets the message and consumes it, assuming it completes quickly
  • C2: Consumer 2, picks up the message and consumes it, assuming a slow completion
  • Queue: Message queue, shown in red. Like a mailbox, it can cache messages; Producers post messages to them, and consumers retrieve messages from them.

Development of message producers

ConnectUtils is the utility class I wrote in the last article

public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        // Create a connection to RabbitMQ using the tool class that created the connection
        Connection connection = ConnectUtils.getConnection("121.199.53.150".5672."/ems"."ems"."ems");
        // Create channel
        Channel channel = connection.createChannel();
        // bind the channel to the queue
        channel.queueDeclare("work".true.false.false.null);
        
        for (int i = 0; i < 20; i++) {
            // Publish messages to queues
            channel.basicPublish(""."work", MessageProperties.PERSISTENT_TEXT_PLAIN,("work rabbitmq"+ i).getBytes()); } channel.close(); connection.close(); }}Copy the code

After the rabbitMQ management page is displayed:

Queues contain a queue called Work, which has 20 messages created. The Work message model publishes messages successfully.

2. Consumer consumption messages of the Work message model

Development of message consumers

Work message model Consumer A (fast message consumption speed) :

package com.cheng.work;

import com.cheng.utils.ConnectUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectUtils.getConnection("121.199.53.150".5672."/ems"."ems"."ems");
        Channel channel = connection.createChannel();
        channel.queueDeclare("work".true.false.false.null);
        for (int i = 0; i < 20; i++) {
            channel.basicPublish(""."work", MessageProperties.PERSISTENT_TEXT_PLAIN,("work rabbitmq"+ i).getBytes()); }}}Copy the code

Work message model Consumer B (consuming messages slowly) :

package com.cheng.work;

import com.cheng.utils.ConnectUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class ConsumerB {
    public static void main(String[] args) throws IOException {
        Connection connection = ConnectUtils.getConnection("121.199.53.150".5672."/ems"."ems"."ems");
        // Create channel
        Channel channel = connection.createChannel();
        // bind the channel to the queue
        channel.queueDeclare("work".true.false.false.null);
        channel.basicConsume("work".true.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("consumerB:" + newString(body)); }}); }}Copy the code

Execute two message consumers to listen for messages in the queue, and execute message producers to send messages to see the console output:

Consumer A: Consume 10 messages quickly

Consumer B: Consume a message every second, slow execution

As can be seen from the above two figures, two consumers consume the same amount of information.

By default RabbitMQ sends each message to the next consumer in order. On average, each consumer receives the same number of messages. This way of distributing messages is called a loop.

The problem with this approach, however, is that fast consumers and slow consumers get the same number of messages, and the slow consumers continue to execute after the fast consumers consume the messages, slowing down the entire service. We prefer to be more of a “jack of all trades”, with faster ones doing more processing and slower ones doing less.

3. Automatic message confirmation mechanism

The second argument to the basicConsume() method is Boolean autoAck, which indicates whether automatic message acknowledgment is turned on.

If autoAck=true, the consumer will send an acknowledgement to RabbitMQ whenever the message is in the queue, whether or not it was executed, and RabbitMQ will mark the message as deleted. But what happens if one of the consumers starts a long-term task and only partially completes it and dies? We will lose the message it just processed. We will also lose all messages sent to the consumer that have not yet been processed.

AutoAck =false disables automatic message acknowledgement, and if the consumer dies (or the channel is closed, the connection is closed, or the TCP connection is lost) without sending RabbitMQ an acknowledgement, RabbitMQ will understand that the message was not fully processed and will re-queue. If another consumer is online at the same time, it quickly resends it to another consumer. This way, even if a worker dies occasionally, you can be sure that no messages are lost.

In the consumers A and B closed automatically confirm news, open the manual confirmation, and set up the channel can only send one message at A time, in this way, consumers will each get A processed A confirmation sent back again to the RabbitMQ, queue then sends A message to consumers, it also realizes “” out whoever needs:

public class ConsumerA {
    public static void main(String[] args) throws IOException {
        Connection connection = ConnectUtils.getConnection("121.199.53.150".5672."/ems"."ems"."ems");
        // Create channel
        final Channel channel = connection.createChannel();
        channel.basicQos(1);
        // bind the channel to the queue
        channel.queueDeclare("work".true.false.false.null);
        channel.basicConsume("work".false.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerA:"+new String(body));
                // Parameter 1: manually confirm the message id. Parameter 2: false Confirm one message at a time
                channel.basicAck(envelope.getDeliveryTag(),false); }}); }}Copy the code
public class ConsumerB {
    public static void main(String[] args) throws IOException {
        Connection connection = ConnectUtils.getConnection("121.199.53.150".5672."/ems"."ems"."ems");
        // Create channel
        final Channel channel = connection.createChannel();
        channel.basicQos(1);
        // bind the channel to the queue
        channel.queueDeclare("work".true.false.false.null);
        channel.basicConsume("work".false.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("consumerB:" + new String(body));
                // Parameter 1: manually confirm the message identifier. Parameter 2: false Indicates whether to enable the simultaneous confirmation of multiple messages
                channel.basicAck(envelope.getDeliveryTag(),false); }}); }}Copy the code

Execute two message consumers to listen for messages in the queue, and execute message producers to send messages to see the console output:

Consumer A: Because the processing speed is fast and the message processing is large

Consumer B: Because the processing speed is slow, a message is processed