Follow the official account:
The Java class representative, the article is updated daily for more information.

2. Work Queue

In the first tutorial, we wrote two programs that send and receive messages from a specified queue. In this tutorial, we will create a work queue that can be used to distribute “time-consuming” tasks to multiple workers.

The idea behind work queuing, or task queuing, is to avoid immediate processing of tasks that are resource-intensive and have to wait for them to finish running. Instead, schedule the task to take place later (asynchronously, to put it simply). A worker running in the background receives the task and executes it. When you run more than one worker, the tasks in the work queue will be shared among them.

This idea is useful in Web applications, where complex tasks cannot be handled through a short HTTP request window.

Preparation

In the previous tutorial, we sent a string message: “”Hello World!” . Next we send some strings that represent the complexity of the task. We don’t have real world tasks like zooming in and out of images or rendering PDF files, so let’s use Thread.sleep() to pretend to be busy. Use the number of dots in the string as the complexity of the task: each dot represents one second of “work”. For example, the string Hello… Represents a task that will take 3 seconds.

Change the code for send.java in the previous example slightly to allow arbitrary messages to be entered from the terminal. The application will assign tasks to our work queue, so name it: NewTask.java

String message = String.join(" ", argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

The old recv.java application also had to make some changes: it needed to fake a second of work for each dot in the message. It will be responsible for receiving messages and processing tasks, so name it Worker.java

DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); }}; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

A dummy task used to simulate execution time:

private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); }}

Compile as you did in Tutorial 1 (make sure all the required JARs are in the working directory and the environment variable: CP is set) :

javac -cp $CP NewTask.java Worker.java

$CP = $CP = $CP = $CP = $CP = $CP = $CP — Note of course representative

Round- Robin Dispatching

One of the advantages of using task queues is the ease of scaling horizontally. Assuming a backlog of tasks, we can add more worker programs and easily expand them.

First, let’s run two Worker instances simultaneously. They’re both going to get messages from the queue, but how exactly is that going to work? Let’s explore it together.

You need to open three terminals. Two are used to run worker programs. These two are going to be consumers — C1 and C2

# shell 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C

The third terminal is used to publish new tasks. When the consumer starts, it can send several messages:

# shell 3 java -cp $CP NewTask First message. # => [x] Sent 'First message.' java -cp $CP NewTask Second message.. # => [x] Sent 'Second message.. ' java -cp $CP NewTask Third message... # => [x] Sent 'Third message... ' java -cp $CP NewTask Fourth message.... # => [x] Sent 'Fourth message.... ' java -cp $CP NewTask Fifth message..... # => [x] Sent 'Fifth message..... '

Let’s take a look at what the terminal running the worker printed:

java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message... ' # => [x] Received 'Fifth message..... '
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message.. ' # => [x] Received 'Fourth message.... '

By default, RabbitMQ sends each message in order to the next consumer. Each consumer is equally assigned the same number of messages. This message distribution mechanism is called polling.

You can run several more Worker instances to try it yourself.

Message Acknowledgement

The mission may take some time. Have you ever wondered what happens if the app dies before the task is finished? In our current code, once RabbitMQ distributes a message to a consumer, it immediately marks the message as deleted. In this case, once the Worker program is terminated, it will lose the messages it is processing, as well as the messages it has received but has not yet started processing.

But we don’t want to lose the mission. If a worker app fails, we want the tasks it was working on to be handed over to another worker.

To ensure that messages are not lost, RabbitMQ provides message acknowledgement. Message acknowledgement is sent back by the consumer to tell RabbitMQ that a specific message has been received, processed, and that RabbitMQ can delete the message.

If a consumer dies without an acknowledgement (an ACK) (a channel is closed, a link is closed, or a TCP connection is lost), RabbitMQ will assume that the message was not processed correctly and will re-queue it. If there are other consumers online at this point, RabbitMQ will quickly send the message to them. This ensures that messages will not be lost even if the worker suddenly hangs.

Messages do not timeout: RabbitMQ will resend a message when a consumer dies. It doesn’t matter if it takes a long time to process a message.

Manual message acknowledgement is enabled by default. In the previous example we turned it off by setting autoAck=true. Now we set the flag bit to false and ask the worker to send a confirmation when the work is done.

channel.basicQos(1); // accept only one unack-ed message at a time (see below) DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }}; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

The above code ensures that even if you stop a worker processing messages using Ctrl +C, you will not lose any messages. Messages that are not confirmed after the worker hangs will be reposted quickly.

The confirmation message must be sent with the same channel as when it was received. Attempts to return confirmation using a different Channel will report a Channel protocol exception. See the validation mechanism reference documentation for details

Forget to confirm

A common mistake is forgetting to call Basicack. This simple mistake will have serious consequences. When your program has finished processing the message but forgot to send the acknowledgement, the message will be redelivered, and RabbitMQ will use more and more memory because it cannot delete the unacknowledged message.

To facilitate troubleshooting such problems, you can use the rabbitmqctl tool to print the Messages_Unacknowledged field:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Sudo:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Message persistence (Message persistence)

We’ve learned how to make sure the task doesn’t get lost if the customer dies. However, if the RabbitMQ service stops, the task will still be lost.

If not configured, RabbitMQ will lose messages that were already in the queue when it stops or crashes. To avoid this, both queues and messages should be Durable:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

Although the above command is correct, it does not currently work correctly. This is because we have declared a non-persistent queue named “hello” in RabbitMQ. RabbitMQ cannot modify the parameters of an existing queue. Instead, we can name a new queue that enables persistence, such as task_queue:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

The QueueCare method with a true persistence parameter needs to be added to both the producer and consumer code.

At this point, we can be sure that the task_queue will not be lost even if RabbitMQ is restarted. Next we set the message to persist by setting the value of MessageProperties to Persistent_Text_Plain.

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

Message persistence considerations

Marking a message as persistent is not a complete guarantee that it will not be lost. Although RabbitMQ was told to save the message to disk, there was a small window where RabbitMQ received the message but did not save it. In addition, RabbitMQ does not perform fsync(2) on every message — it may have just been written to the cache and not actually written to disk. The persistence mechanism is not robust, but queues are sufficient for Tasks. For more reliable persistence, you need to use Publisher Confirms.

A Fair dispatch is required.

Polling distribution sometimes doesn’t meet our needs. For example, in a scenario with only two workers, an odd number message involves a lot of computation, while an even number message is simple. RabbitMQ does not know how difficult the message is and will only distribute it evenly to two workers.

This happens because RabbitMQ is only responsible for distributing messages received in the queue and does not care about the number of unacknowledged messages from the consumer. It’s just blindly sending the NTH message to the NTH consumer.

To solve this problem, we can call the basicQos method with its prefetchCount parameter set to 1. This will tell RabbitMQ not to give more than one message to the worker at the same time. In other words, do not distribute new messages to the worker until he has returned confirmation. This way, RabbitMQ will send messages to other workers who are not busy.

int prefetchCount = 1;
channel.basicQos(prefetchCount);

About queue size

If all workers are busy, the queue is likely to fill up. You need to monitor its size in real time, either increase the number of workers, or use other strategies (e.g., control the ratio of producers to consumers).

Code Integration: Putting It All Together

The final NewTask.java code looks like this:

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); }}}

(newTask.java source file)”

Worker.java:

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }}; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }

(Worker.java source file)”)

The work queue established using message acknowledgement and setting the PREFETCHCOUNT parameter. Its persistence Settings allow messages to survive a RabbitMQ restart.

For more information about Channels and MessageProperties, visit: Javadocs Online.

Let’s move on to Tutorial 3 to learn how to send the same message to multiple consumers.


RabbitMQ tutorial 1. “Hello World”

Freemarker Tutorial (1)- Template Development Manual

The downloaded attachment name is always garbled? It’s time you read the RFC documentation!

Use Spring Validation to validate parameters elegantly

MySQL priority queue (order by limit problem)


Code word is not easy, welcome thumb up share.

Search: 【The Java class representative】, pay attention to the public number, timely access to more Java dry goods.