The premise

An asynchronous process was heavily used in a user label service and was decoupled using RabbitMQ. In order to improve the processing efficiency of consumers, the number of consumer threads and preFETch_count parameters of different node tasks are adjusted and tested, and a relatively reasonable combination is obtained. Here’s a closer look at the preFETch_count parameter’s role in RabbitMQ.

Prefetch_count Parameter meaning

The Advanced Message Queuing Protocol (AMQP) and RabbitMQ implementations of the Advanced Message Queuing Protocol (AMQP) prefetch_count parameter are important for RabbitMQ. Refer to the corresponding documentation (see Resources at the end of this article). AMQP 0-9-1 defines basic.qos methods to limit the maximum number of unack messages that a consumer can send on a Channel or Connection. The basic.qos method supports two parameters:

  • global: Boolean value.
  • prefetch_count: an integer.

These two parameters have different meanings in AMQP 0-9-1 than they do in RabbitMQ.

globalThe parameter value AMQP 0-9-1In theprefetch_countMeanings of Parameters RabbitMQIn theprefetch_countMeanings of Parameters
false prefetch_countValues in the currentChannelShared by all consumers prefetch_countBased on the currentChannelThe created consumer takes effect
true prefetch_countValues in the currentConnectionShared by all consumers prefetch_countValues in the currentChannelShared by all consumers

Or use a simple English table to understand:

global prefetch_count in AMQP 0-9-1 prefetch_count in RabbitMQ
false Per channel limit Per customer limit
true Per connection limit Per channel limit

To understand this, let me draw a picture:

Just to distinguish between the protocol itself and the implementation in RabbitMQ, follow up with preFETch_count for the consumer (thread) and the message to be consumed. Assume that the RabbitMQ client can fetch queued messages from the RabbitMQ server faster than the consumer thread can consume them, and that there are currently two consumer threads sharing a Channel instance. When the global argument is false, the effect is as follows:

When the global argument is true, the effect is as follows:

Prefetch_count Unack messages are temporarily stored in a queue (blocking queue, to be precise, The message tasks in the blocking queue are then streamed to a list of callback consumer handles to be processed by the consumer (see source code analysis in the next section). These messages can take up the HEAP memory of the JVM, so the footprint of these “prefetch messages” must be considered when tuning performance or setting the initialization and maximum heap memory of an application that happens to be a consumer of RabbitMQ. However, it is important to note that prefetch_count is a RabbitMQ server parameter and neither its Settings nor snapshots are stored in the RabbitMQ client. Also note the conditions and features under which preFETch_count takes effect (as seen in some demos and source code for parameter Settings) :

  • prefetch_countParameter only inbasic.consumetheautoAckParameter set tofalseThat is, automatic acknowledgment cannot be used. The flow of automatically acknowledged messages cannot be limited.
  • basic.consumeIf you forget to call manually in non-automatic confirmation modebasic.ack, thenprefetch_countIt is notackThe maximum number of messages.
  • prefetch_countIs made up ofRabbitMQServer control, in general, can ensure that the individual consumer threads are notackMessage distribution is balanced, I guessconsumerTagPlayed a key role.

Prefetch_count source code trace in the RabbitMQ client

The RabbitMQ client is com. RabbitMQ :amqp-client:5.9.0

The most basic way to analyze RabbitMQ is to read the source code of the Java client for RabbitMQ. The basic. Qos and Basic. Is the corresponding com. The rabbitmq. Client. Impl. ChannelN# basicQos () and com. The rabbitmq. Client. The impl. ChannelN# basicConsume () two methods. Look at the ChannelN# basicQos () :

The basicQos() method has a prefetchSize parameter that limits the size of the content to be distributed. The default value is 0 for unrestricted content, and the value of prefetchCount is in the range of [0,65535], which is also unrestricted. The ChannelN#basicQos() implementation encapsulates the basic.qos method parameters in a single RPC call, meaning that the configuration of the RabbitMQ server is changed immediately, and the parameter values are not stored in the client code at all, confirming the conclusion of the previous section. Then look at the ChannelN#basicConsume() method:

The key parts have been circled in red in the figure above, because the entire message consumption process is asynchronous and involves too many classes and methods to be fully posted here.

The prefetch_count parameter does not appear in the client code throughout the message consumption process, again confirming the conclusion from the previous section that the behavior and actions of the prefetch_count parameter are completely controlled by the RabbitMQ server. The final Customer or the usual DefaultCustomer handle is called back in the WorkPoolRunnable. The thread executing this task comes from the thread pool inside the ConsumerWorkService. And the thread pool using the Executors. NewFixedThreadPool () to build, use the default thread factory class, so in Customer# handleDelivery () method to print the names of the thread inside is the pool – 1 – thread – *.

VariableLinkedBlockingQueue here is a message in the previous section the prototype of the queue

Prefetch_count is used

Setting the prefetch_count parameter is easy by calling the Channel#basicQos() method:

public class RabbitQos {

    static String QUEUE = "qos.test";

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, true.false.false.null);
        channel.basicQos(2);
        channel.basicConsume("qos.test".false.new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("1 -- -- -- -- -- -"+ Thread.currentThread().getName()); sleep(); }}); channel.basicConsume("qos.test".false.new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("2 -- -- -- -- -- -"+ Thread.currentThread().getName()); sleep(); }});for (int i = 0; i < 20; i++) {
            channel.basicPublish("", QUEUE, MessageProperties.TEXT_PLAIN, String.valueOf(i).getBytes());
        }
        sleep();
    }

    private static void sleep(a) {
        try {
            Thread.sleep(Long.MAX_VALUE);
        } catch (Exception ignore) {

        }
    }
}
Copy the code

Spring-boot-starter-amqp: spring-boot-starter-AMQP: spring-boot-starter-AMQP: spring-boot-starter-AMQP By the spring in the configuration file. The rabbitmq. Listener. Direct. The prefetch attribute specifies all consumers prefetch_count thread, if you want to some consumers thread for this property is set, May need to be done for RabbitListenerContainerFactory transformation.

Prefetch_count parameter best practice

Finding Time functions with RabbitMQ 3.3 is an article on prefetch_count. This article examines the entire process of message flow control and mentions some of the metrics for the preFETch_count parameter:

As indicated here, if the value of prefetch_count exceeds 30, then network bandwidth constraints begin to dominate and further increases in prefetch_count become ineffective. In other words, the official recommendation is to set preFETch_count to 30. Here to see the spring – the boot – starter – closer in the default value of this parameter definition, the specific is the DEFAULT_PREFETCH_COUNT AbstractMessageListenerContainer:

If not through the spring. The rabbitmq. Listener. Direct. The prefetch are overwritten, then use the spring – the boot – starter – annotations defined consumer threads in closer prefetch_count is set in 250.

In my opinion, preFETch_count Settings should be considered in terms of bandwidth, datagram size per message, consumer thread processing speed, and so on. The summary is as follows (personal experience is for reference only) :

  • Consider scenarios where the processing speed of the consumer thread is very slow and the number of messages on the queue is very smallprefetch_countSet to1.
  • When the datagram of each message in the queue is very large, calculate how many bytes the client can holdackTotal message volume memory limit, thus designing a reasonableprefetch_countValue.
  • When the consumer thread processing speed is very fast, far greater thanRabbitMQServer message distribution, under the premise of sufficient network bandwidth, can be setprefetch_countValue is set to0Do not do any message flow control.
  • This parameter is recommended in common scenariosRabbitMQThe official recommended value30orspring-boot-starter-amqpDefault values in250.

summary

To summarize:

  • prefetch_countisRabbitMQThe server parameters take effect immediately after being set.
  • prefetch_countforAMQP-0-9-1And the definition ofRabbitMQThe implementation in is not exactly the same.
  • prefetch_countIt is recommended to use the default values provided by the framework or to calculate and evaluate a reasonable value through grouping experiments combined with datagram sizes.

eggs

After the author published the article on the official account and moments, the author’s teacher made comments and pointed out one of the shortcomings:

In fact, prefetch_count is essentially a flow control for consumers. The official article also mentioned the importance of network and bandwidth, so consider RTT (round-trip Time). Here the concept of RTT comes from Principles of Computer Networking:

The RTT includes packet-propagation delays, packet-queuing delays and packet -processing delay.

In other words, RTT = packet propagation delay (round trip) + packet queuing delay (router and switch) + data processing delay (application processing time, the scenario used in this paper is consumer processing time). Assuming that RTT only calculates the network delay and excludes the delay of data processing, then the round-trip cost of a packet is 2RTT, that is, the round-trip cost of a packet consuming message processing. The larger THE RTT, the higher the cost of data transmission, and the client should be allowed to “prefetch” more unack messages to avoid waiting for the consumer thread. This allows you to calculate the number of “prefetch_count” messages when the single consumer thread is at its most saturated: PreFETch_count = 2RTT/the time it takes the consumer thread to process a single message. To illustrate this concept:

  • whenRTTfor30ms, while the consumer thread takes time to process a single message10msIn this case, the consumption rate is dominant and can be consideredprefetch_countSet to6Or a larger value (considering heap memory limits).
  • whenRTTfor30ms, while the consumer thread takes time to process a single message200ms.RTTPreponderance, consumption rate lag, consider at this timeprefetch_countSet to1Can.

Consider: why does spring-boot-starter-AMqp default prefetch_count to 250 and few developers change it with no apparent problems?

(C-4-D E-A-20201017)

Personal blog

  • Throwable’s Blog