The premise
An asynchronous process was heavily used in a user label service and was decoupled using RabbitMQ. In order to improve the processing efficiency of consumers, the number of consumer threads and preFETch_count parameters of different node tasks are adjusted and tested, and a relatively reasonable combination is obtained. Here’s a closer look at the preFETch_count parameter’s role in RabbitMQ.
Prefetch_count Parameter meaning
The Advanced Message Queuing Protocol (AMQP) and RabbitMQ implementations of the Advanced Message Queuing Protocol (AMQP) prefetch_count parameter are important for RabbitMQ. Refer to the corresponding documentation (see Resources at the end of this article). AMQP 0-9-1 defines basic.qos methods to limit the maximum number of unack messages that a consumer can send on a Channel or Connection. The basic.qos method supports two parameters:
global
: Boolean value.prefetch_count
: an integer.
These two parameters have different meanings in AMQP 0-9-1 than they do in RabbitMQ.
global The parameter value |
AMQP 0-9-1 In theprefetch_count Meanings of Parameters |
RabbitMQ In theprefetch_count Meanings of Parameters |
---|---|---|
false |
prefetch_count Values in the currentChannel Shared by all consumers |
prefetch_count Based on the currentChannel The created consumer takes effect |
true |
prefetch_count Values in the currentConnection Shared by all consumers |
prefetch_count Values in the currentChannel Shared by all consumers |
Or use a simple English table to understand:
global |
prefetch_count in AMQP 0-9-1 |
prefetch_count in RabbitMQ |
---|---|---|
false |
Per channel limit |
Per customer limit |
true |
Per connection limit |
Per channel limit |
To understand this, let me draw a picture:
Just to distinguish between the protocol itself and the implementation in RabbitMQ, follow up with preFETch_count for the consumer (thread) and the message to be consumed. Assume that the RabbitMQ client can fetch queued messages from the RabbitMQ server faster than the consumer thread can consume them, and that there are currently two consumer threads sharing a Channel instance. When the global argument is false, the effect is as follows:
When the global argument is true, the effect is as follows:
Prefetch_count Unack messages are temporarily stored in a queue (blocking queue, to be precise, The message tasks in the blocking queue are then streamed to a list of callback consumer handles to be processed by the consumer (see source code analysis in the next section). These messages can take up the HEAP memory of the JVM, so the footprint of these “prefetch messages” must be considered when tuning performance or setting the initialization and maximum heap memory of an application that happens to be a consumer of RabbitMQ. However, it is important to note that prefetch_count is a RabbitMQ server parameter and neither its Settings nor snapshots are stored in the RabbitMQ client. Also note the conditions and features under which preFETch_count takes effect (as seen in some demos and source code for parameter Settings) :
prefetch_count
Parameter only inbasic.consume
theautoAck
Parameter set tofalse
That is, automatic acknowledgment cannot be used. The flow of automatically acknowledged messages cannot be limited.basic.consume
If you forget to call manually in non-automatic confirmation modebasic.ack
, thenprefetch_count
It is notack
The maximum number of messages.prefetch_count
Is made up ofRabbitMQ
Server control, in general, can ensure that the individual consumer threads are notack
Message distribution is balanced, I guessconsumerTag
Played a key role.
Prefetch_count source code trace in the RabbitMQ client
The RabbitMQ client is com. RabbitMQ :amqp-client:5.9.0
The most basic way to analyze RabbitMQ is to read the source code of the Java client for RabbitMQ. The basic. Qos and Basic. Is the corresponding com. The rabbitmq. Client. Impl. ChannelN# basicQos () and com. The rabbitmq. Client. The impl. ChannelN# basicConsume () two methods. Look at the ChannelN# basicQos () :
The basicQos() method has a prefetchSize parameter that limits the size of the content to be distributed. The default value is 0 for unrestricted content, and the value of prefetchCount is in the range of [0,65535], which is also unrestricted. The ChannelN#basicQos() implementation encapsulates the basic.qos method parameters in a single RPC call, meaning that the configuration of the RabbitMQ server is changed immediately, and the parameter values are not stored in the client code at all, confirming the conclusion of the previous section. Then look at the ChannelN#basicConsume() method:
The key parts have been circled in red in the figure above, because the entire message consumption process is asynchronous and involves too many classes and methods to be fully posted here.
The prefetch_count parameter does not appear in the client code throughout the message consumption process, again confirming the conclusion from the previous section that the behavior and actions of the prefetch_count parameter are completely controlled by the RabbitMQ server. The final Customer or the usual DefaultCustomer handle is called back in the WorkPoolRunnable. The thread executing this task comes from the thread pool inside the ConsumerWorkService. And the thread pool using the Executors. NewFixedThreadPool () to build, use the default thread factory class, so in Customer# handleDelivery () method to print the names of the thread inside is the pool – 1 – thread – *.
VariableLinkedBlockingQueue here is a message in the previous section the prototype of the queue
Prefetch_count is used
Setting the prefetch_count parameter is easy by calling the Channel#basicQos() method:
public class RabbitQos {
static String QUEUE = "qos.test";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE, true.false.false.null);
channel.basicQos(2);
channel.basicConsume("qos.test".false.new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("1 -- -- -- -- -- -"+ Thread.currentThread().getName()); sleep(); }}); channel.basicConsume("qos.test".false.new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("2 -- -- -- -- -- -"+ Thread.currentThread().getName()); sleep(); }});for (int i = 0; i < 20; i++) {
channel.basicPublish("", QUEUE, MessageProperties.TEXT_PLAIN, String.valueOf(i).getBytes());
}
sleep();
}
private static void sleep(a) {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (Exception ignore) {
}
}
}
Copy the code
Spring-boot-starter-amqp: spring-boot-starter-AMQP: spring-boot-starter-AMQP: spring-boot-starter-AMQP By the spring in the configuration file. The rabbitmq. Listener. Direct. The prefetch attribute specifies all consumers prefetch_count thread, if you want to some consumers thread for this property is set, May need to be done for RabbitListenerContainerFactory transformation.
Prefetch_count parameter best practice
Finding Time functions with RabbitMQ 3.3 is an article on prefetch_count. This article examines the entire process of message flow control and mentions some of the metrics for the preFETch_count parameter:
As indicated here, if the value of prefetch_count exceeds 30, then network bandwidth constraints begin to dominate and further increases in prefetch_count become ineffective. In other words, the official recommendation is to set preFETch_count to 30. Here to see the spring – the boot – starter – closer in the default value of this parameter definition, the specific is the DEFAULT_PREFETCH_COUNT AbstractMessageListenerContainer:
If not through the spring. The rabbitmq. Listener. Direct. The prefetch are overwritten, then use the spring – the boot – starter – annotations defined consumer threads in closer prefetch_count is set in 250.
In my opinion, preFETch_count Settings should be considered in terms of bandwidth, datagram size per message, consumer thread processing speed, and so on. The summary is as follows (personal experience is for reference only) :
- Consider scenarios where the processing speed of the consumer thread is very slow and the number of messages on the queue is very small
prefetch_count
Set to1
. - When the datagram of each message in the queue is very large, calculate how many bytes the client can hold
ack
Total message volume memory limit, thus designing a reasonableprefetch_count
Value. - When the consumer thread processing speed is very fast, far greater than
RabbitMQ
Server message distribution, under the premise of sufficient network bandwidth, can be setprefetch_count
Value is set to0
Do not do any message flow control. - This parameter is recommended in common scenarios
RabbitMQ
The official recommended value30
orspring-boot-starter-amqp
Default values in250
.
summary
To summarize:
prefetch_count
isRabbitMQ
The server parameters take effect immediately after being set.prefetch_count
forAMQP-0-9-1
And the definition ofRabbitMQ
The implementation in is not exactly the same.prefetch_count
It is recommended to use the default values provided by the framework or to calculate and evaluate a reasonable value through grouping experiments combined with datagram sizes.
eggs
After the author published the article on the official account and moments, the author’s teacher made comments and pointed out one of the shortcomings:
In fact, prefetch_count is essentially a flow control for consumers. The official article also mentioned the importance of network and bandwidth, so consider RTT (round-trip Time). Here the concept of RTT comes from Principles of Computer Networking:
The RTT includes packet-propagation delays, packet-queuing delays and packet -processing delay.
In other words, RTT = packet propagation delay (round trip) + packet queuing delay (router and switch) + data processing delay (application processing time, the scenario used in this paper is consumer processing time). Assuming that RTT only calculates the network delay and excludes the delay of data processing, then the round-trip cost of a packet is 2RTT, that is, the round-trip cost of a packet consuming message processing. The larger THE RTT, the higher the cost of data transmission, and the client should be allowed to “prefetch” more unack messages to avoid waiting for the consumer thread. This allows you to calculate the number of “prefetch_count” messages when the single consumer thread is at its most saturated: PreFETch_count = 2RTT/the time it takes the consumer thread to process a single message. To illustrate this concept:
- when
RTT
for30ms
, while the consumer thread takes time to process a single message10ms
In this case, the consumption rate is dominant and can be consideredprefetch_count
Set to6
Or a larger value (considering heap memory limits). - when
RTT
for30ms
, while the consumer thread takes time to process a single message200ms
.RTT
Preponderance, consumption rate lag, consider at this timeprefetch_count
Set to1
Can.
Consider: why does spring-boot-starter-AMqp default prefetch_count to 250 and few developers change it with no apparent problems?
(C-4-D E-A-20201017)
Personal blog
- Throwable’s Blog