scenario

Recently, pulsar was used to do peak clipping, with a wave of 3-40 messages, and the average completion speed of each message was about 10 seconds. The throughput was found to be very slow. According to log observation, consumers processed 10 messages in the same second, waited for one minute and continued to process 10 messages in the second wave, and so on, very regularly. Each time, the next 10 pieces of information were processed one minute apart.

To solve the process

1. First look at the name of the thread from which each message is sent

Find the thread name of the message processing, find a special thread pool with a page name that should be the project’s custom thread pool, and then find the configuration of this thread pool,

We found that the core and the maximum number of threads are all 10, which should be consistent with one of the above 10 cases, and the thread pool is using the reject policy:

ThreadPoolExecutor pulsarConsumerExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat("pulsar-consumer-thread-%d").build(), new ThreadPoolExecutor.CallerRunsPolicy());
Copy the code

2. Observe where the thread pool is used to process messages

The next step is to find out where to use the thread pool. Intuitively, the pulsar consumer must have placed the message in the thread pool when it received the message.

consumerBuilder .consumerName(name + "-consumer") .subscriptionName(subscriptionName) .topic(pulsarConsumerProperties.getSupportTopicEnv() ? topic.concat(topicSuffix) : topic) .ackTimeout(pulsarConsumerProperties.getAckTimeOut(), TimeUnit. SECONDS). SubscriptionType (getSubscriptionType (pulsarConsumerProperties. GetSubscriptionType ())) / / use the subscription type when you subscribe to the topic .maxTotalReceiverQueueSizeAcrossPartitions( PulsarConsumerProperties. GetMaxTotalReceiverQueueSizeAcrossPartitions ()) / / across partitions the biggest receiver queue total size .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) ReceiverQueueSize (pulsarConsumerProperties. GetReceiverQueueSize ()) / / local consumers accept the size of the queue. The intercept (consumerInterceptor) .subscribe(). MessageListener ((consumer, MSG) -> {// todo pseudocode to submit consumer and MSG to the thread pool for processing}).subscribe();Copy the code

In this project, only one consumer is created, that is, one consumer processes only one message at a time, but in order to increase the consumption power of a consumer, MSG is sent to the thread pool in messageListener. That is, a consumer can keep receiving messages and dropping them into the thread pool, leaving the thread pool to handle the logic and only receiving messages.

This can improve throughput, but when the thread pool is full, the rejection policy is implemented, so that the message is not processed, and the exception is thrown directly.

The message queue middleware has a mechanism to retry the message. It sends the message to the consumer, but the consumer does not reply to the ACK.

What is the timeout period that the message queue will try to resend in order not to lose the message?

In the code above, there is the ackTimeout property, which is used in our project for 60 seconds, and this is where the truth becomes clear.

conclusion

Because consumers are constantly receiving messages and dropping them into the thread pool, which has limited capacity to process only 10 at a time, each time taking 10 seconds,

But receive messages speed is quick, can receive a number of basic 1 seconds, at that time also lost the thread pool, pool only refuse, that the news in our client has dropped, but didn’t receive an ack after the pulsar 60 seconds, just try again a wave, will continue to send a message to come over, at this point, we have the idle thread pool, You can move on, and this is the root cause of the beginning.

The solution

  1. Open multiple consumers, each consumer does not need a thread pool to process the message, their own processing, processing a and then receive a, the reason to open more is to improve the throughput capacity, you can decide according to their own situation to open more than, here is to create ~

  2. Using a thread pool to process messages, change the thread pool’s reject policy to the caller’s execute policy. That is, if the thread pool can’t handle the message, it will be processed in the current thread, in the consumer’s own thread, so that the consumer is also doing the work. The downside of this is that only the consumer is doing the work. He can continue to receive messages, maybe after 10 seconds, he can continue to receive messages, and then the thread pool is free, he commits to the thread pool, and so on.

There are other ways you can set it up, like

After knowing the maximum number of messages and the consumption capacity of the service, set the number of threads and waiting queues in the thread pool to ensure that the specific number of messages can be accommodated.

Or let the consumer thread cooperate with the thread pool every time it receives a message, and determine whether there is idle time in the thread pool. If there is idle time in the thread pool, it will notify the consumer to continue to submit to the thread pool for processing through the thread cooperation mode. The specific situation can be used according to its own situation.