2 Work Queue


In the first tutorial, we wrote two programs to send and receive messages from a specified queue. In this tutorial, we will create a work queue that distributes “time-consuming” tasks to multiple workers.

The idea behind work queues, or task queues, is to avoid immediately processing tasks that are costly and need to wait for them to finish running. Instead, schedule the task for a later date. A worker program running in the background will receive and perform the task. When you run multiple workers, the tasks in the work queue are shared among them.

This idea is useful in Web applications where complex tasks cannot be handled through a short HTTP request window.

Preparation

In the previous tutorial, we sent a string message: “Hello World! . Next we send some strings to represent the complexity of the task. We don’t have the complex tasks of the real world like zooming in and out of images or rendering PDF files, so let’s use thread.sleep () to pretend we’re busy. Use the number of dots in the string as the complexity of the task: each dot represents one second of “work.” For example: the string Hello… The task represented will take 3 seconds.

Change the send-.java code from the previous example slightly to allow arbitrary messages to be entered from the terminal. The application will schedule tasks to our work queue, so name it newtask.java

String message = String.join(" ", argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Copy the code

The old Recv. Java application also had to make some changes: it needed to fake one second of work for each dot in the message. It will be responsible for receiving messages and processing tasks, so name it worker.java

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
  }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
Copy the code

Fake tasks used to simulate execution time:

private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); }}Copy the code

Compile as in Tutorial 1 (make sure the required JAR packages are in the working directory and the environment variable: CP is set) :

javac -cp $CP NewTask.java Worker.java
Copy the code

$CP %CP% $CP % — Notes from class representatives

(Round-robin Dispatching)

One of the advantages of using task queues is that they facilitate horizontal scaling. Assuming that there is a backlog of tasks, we can add more worker programs and easily expand.

First, let’s run two worker instances simultaneously. They all get messages from the queue, but how does it work? Let’s explore it.

You need to open three terminals. Two to run the worker application. These two are going to be consumers — C1 and C2

# shell 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
Copy the code
# shell 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
Copy the code

The third terminal is used to publish new tasks. When the consumer is started, several messages can be sent:

# shell 3 java -cp $CP NewTask First message. # => [x] Sent 'First message.' java -cp $CP NewTask Second message.. # => [x] Sent 'Second message.. ' java -cp $CP NewTask Third message... # => [x] Sent 'Third message... ' java -cp $CP NewTask Fourth message.... # => [x] Sent 'Fourth message.... ' java -cp $CP NewTask Fifth message..... # => [x] Sent 'Fifth message..... 'Copy the code

Let’s take a look at what the terminal running the worker prints:

java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message... ' # => [x] Received 'Fifth message..... 'Copy the code
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message.. ' # => [x] Received 'Fourth message.... 'Copy the code

By default RabbitMQ sends each message to the next consumer in order. Each consumer is equally assigned the same number of messages. This message distribution mechanism is called polling.

You can run several more worker instances to try it yourself.

Message Acknowledgment

It may take a while to execute the mission. Have you ever wondered what happens if your application dies before the task is finished? In our current code, RabbitMQ marks the message as deleted as soon as it has been distributed to the consumer. In this way, once the worker application is terminated, it will lose the messages it is processing, as well as the messages it has received but has not yet started processing.

But we don’t want to lose the mission. If a worker app dies, we expect the tasks it handles to be handed over to another worker.

To ensure that messages are not lost, RabbitMQ provides message confirmation. Message acknowledgement is sent back to RabbitMQ by the consumer to tell RabbitMQ that a specific message has been received, processed and can be deleted.

If a consumer hangs up without an ACK (channel closed, link closed, or TCP connection lost) RabbitMQ will assume that the message was not properly processed and re-queue it. If there are other consumers online at this point, RabbitMQ will quickly send the message to them. This ensures that the message will not be lost even if the worker suddenly hangs.

Messages do not time out: RabbitMQ will resend the message when a consumer hangs up. It doesn’t matter if it takes a long time to process a message.

Manual message confirmation is enabled by default. In the previous example we turned it off by setting autoAck=true. Now we set the flag bit to false and have the worker send confirmation when the work is done.

channel.basicQos(1); // accept only one unack-ed message at a time (see below) DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }}; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });Copy the code

The above code ensures that no messages are lost even if you stop a worker processing a message with CTRL+C. Unacknowledged messages after worker hangs will be redelivered quickly.

The confirmation message must be sent through the same channel as it was received. An attempt to use a different channel will result in a Channel protocol exception. For details, see the reference documentation for the validation mechanism

Forget to confirm

A common mistake is forgetting to call basicAck. This simple mistake can have serious consequences. When your program finishes processing a message and forgets to send an acknowledgement, the message will be redelivered and RabbitMQ will be unable to delete unacknowledged messages, increasing memory usage.

To troubleshoot this problem, use the rabbitmqctl tool to print the messages_unacknowledged field:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Copy the code

Windows down sudo:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
Copy the code

Message Persistence

We have learned how to ensure that tasks are not lost in the event of a consumer death. However, if the RabbitMQ service stops, the task will still be lost.

If not configured, RabbitMQ will lose messages in its queue when it stops or crashes. To avoid this, we need to make both queues and messages durable:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
Copy the code

Although the command above is correct, it does not currently work correctly. We have declared a non-persistent queue called “hello” in RabbitMQ. RabbitMQ cannot modify the parameters of an existing queue. Instead, we can name a new persistent-enabled queue, such as task_queue:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
Copy the code

The queueDeclare method with a persistence parameter of true needs to be added to both producer and consumer code.

At this point, we can be sure that task_queue will not be lost even if RabbitMQ restarts. Next, we set the message to persist by setting the value of MessageProperties to PERSISTENT_TEXT_PLAIN.

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
Copy the code

Considerations for message persistence

Marking messages as persistent is not a complete guarantee against message loss. Although RabbitMQ is told to save the message to disk, there is a small window in which RabbitMQ receives the message but does not save it. Additionally, RabbitMQ will not perform fsync(2) on every message – it may have just been written to the cache and not yet to disk. The persistence mechanism is not robust, but queues are sufficient for tasks. If you want more reliable persistence, you need to use Publisher, confirms.

Fair Dispatch

Polling distribution is not always what we need. For example, in a scenario with only two workers, messages with an odd number involve a lot of calculation, while messages with an even number are simple. RabbitMQ does not know how hard or easy the message is and will only distribute it evenly to both workers.

This happens because RabbitMQ is only responsible for distributing the messages received in the queue, it does not care about the number of messages that the consumer does not acknowledge. It just blindly sends the NTH message to the NTH consumer.

To solve this problem, we can call the basicQos method with its parameter prefetchCount set to 1. This tells RabbitMQ not to send more than one message to the worker at a time. In other words, do not send new messages to the worker until he has returned an acknowledgement. This way RabbitMQ will send messages to other workers who are not busy.

int prefetchCount = 1;
channel.basicQos(prefetchCount);
Copy the code

About queue size

If all the workers are busy, the queue might be full. You need to monitor its size in real time, or increase the number of workers, or use other strategies (e.g., control the ratio of producers to consumers).

Putting It All Together

The resulting newtask.java code looks like this:

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); }}}Copy the code

(newtask.java source file)

Worker.java:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
            doWork(message);
        } finally {
            System.out.println(" [x] Done");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    };
    channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
        if (ch == '.') {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
  }
}
Copy the code

(worker.java source file)

The work queue created using message acknowledgement and setting the prefetchCount parameter. Its persistence Settings allow messages to persist even after RabbitMQ restarts.

For more information about channels and MessageProperties, visit JavaDocs Online.

Let’s move on to Tutorial 3 to learn how to send the same message to multiple consumers.


Past dry goods recommended

Download attachment name total garbled? It’s time you read the RFC documentation!

MySQL priority queue (order by limit)

Freemarker Tutorial (I)- Template development Manual


Code word is not easy, welcome to praise attention and share. Search: [Java class representative], pay attention to the public account. Daily watch, get more Java dry goods in time.